StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Friday, September 2, 2022

Avro Producer and Consumer with Python using Confluent Kafka

 September 02, 2022     Python     No comments   

In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Producer and Consumer with Python using Confluent Kafka. Also refer this article for basic introduction on Getting started with Kafka in Python

Avro Introduction

For streaming data use cases, Avro is a popular data serialization format. Avro is an open source data serialization system that helps with data exchange between systems, programming languages, and processing frameworks. Avro helps define a binary format for your data, as well as map it to the programming language of choice.

Avro relies on schemas. When Avro data is read, the schema is used and when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

Avro has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data.

Let us look at an example to represent the movie data below to Avro schema.

{
  "imdbID": "tt0061722",
  "Title": "The Graduate",
  "Year": 1967,
  "Director": "Mike Nichols",
  "Actors": [
    "Anne Bancroft",
    "Dustin Hoffman",
    "Katharine Ross",
    "William Daniels"
  ],
  "Language": "English",
  "imdbRating": 8.1
}

Movie Avro Schema

Avro schema definitions are JSON records. Because it is a record, it can define multiple fields which are organized in a JSON array. Each such field identifies the field's name as well as its type. The type can be something simple, like an integer, or something complex, like another record.

In the movie schema we have defined the MovieDetails record type with associated fields to represent the movie data.

{
    "type": "record",
    "namespace": "com.stackstalk",
    "name": "MovieDetails",
    "fields": [
        {
            "name": "imdbID",
            "type": "string"
        },
        {
            "name": "Title",
            "type": "string"
        },
        {
            "name": "Year",
            "type": "int"
        },
        {
            "name": "Language",
            "type": "string"
        },
        {
            "name": "Director",
            "type": "string"
        },
        {
            "name": "Actors",
            "type": {
              "type": "array",
              "items": {
                 "type": "string"
              }
            }
        },
        {
            "name": "imdbRating",
            "type": "float"
        }
    ]
}

Confluent Platform

Reference: Confluent Platform

Confluent Platform is an enterprise grade distribution of Apache Kafka. It is a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. Each release of Confluent Platform includes the latest release of Kafka and additional tools and services that make it easier to build and manage an Event Streaming Platform.

For this example we will leverage confluent services zookeeper, broker, schema registry and control center as docker containers. Refer Confluent docker compose specs

Confluent Schema Registry enables safe, zero downtime evolution of schemas by centralizing the schema management. It provides a RESTful interface for storing and retrieving Avro, JSON Schema, and Protobuf schemas. Schema Registry tracks all versions of schemas used for every topic in Kafka and only allows evolution of schemas according to user-defined compatibility settings. This gives developers confidence that they can safely modify schemas as necessary without worrying that doing so will break a different service they may not even be aware of. Support for schemas is a foundational component of a data governance solution.

Confluent Control Center is a GUI-based system for managing and monitoring Kafka. It allows you to easily manage Kafka Connect, to create, edit, and manage connections to other systems. It also allows you to monitor data streams from producer to consumer, assuring that every message is delivered, and measuring how long it takes to deliver messages.

Python producer and consumer example using Confluent Kafka

Now let us write a simple producer and consumer to demonstrate the concepts discussed above. It is pretty simple with confluent kafka.

Let us focus on the producer first. This is an example of synchronous producer. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. We load the defined key and value Avro schema files and associate them during the Avro producer creation. We use the flush call to make the writes synchronous and to ensure the messages are delivered before the producer is shutdown.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
    else:
        print("Message produced: %s" % msg.value().decode('utf-8'))

def load_avro_schema_from_file():
    key_schema = avro.load("avro/movie-topic-key.avsc")
    value_schema = avro.load("avro/movie-topic-value.avsc")

    return key_schema, value_schema

def send_data():
    producer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
    }

    key_schema, value_schema = load_avro_schema_from_file()

    #print(key_schema)
    #print(value_schema)

    try:
        producer = AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)

        f1 = open("data/movie-metadata.json", "r")
        key_str = f1.read();
        f1.close()

        f2 = open("data/movie-details.json", "r")
        value_str = f2.read()
        f2.close()

        producer.produce(topic = "movie-topic", key = json.loads(key_str), headers = [("my-header1", "Value1")], value = json.loads(value_str))
        producer.flush()

    except KafkaException as e:
        print('Kafka failure ' + e)

def main():
    send_data()

if __name__ == "__main__":
    main()

Now let us write a simple consumer. We specify the Kafka server, schema registry and the consumer group as the configuration to create the Avro consumer. We subscribe to the topic of interest before going into a consume loop. In the consume loop we call the poll() method to retrieve the messages from the topic. If there are no messages then poll() method return None.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro

def read_data():
    consumer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081",
        "group.id": "my-connsumer1",
        "auto.offset.reset": "earliest"
    }

    #print(key_schema)
    #print(value_schema)

    consumer = AvroConsumer(consumer_config)
    consumer.subscribe(['movie-topic'])

    while True:
      try:
        msg = consumer.poll(1)

        if msg is None:
          continue

        print("Key is :" + json.dumps(msg.key()))
        print("Value is :" + json.dumps(msg.value()))
        print("-------------------------")

      except KafkaException as e:
        print('Kafka failure ' + e)

    consumer.close()

def main():
    read_data()

if __name__ == "__main__":
    main()

Run the producer code.

$ python3 producer.py

When you run the producer application we should see the messages being received by the consumer.

$ python3 consumer.py 
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------

Open the confluent control center http://localhost:9021/ to view the defined schema registered on movie topic.


Get access to the source code of this example in GitHub.

  • Share This:  
Newer Post Older Post Home

0 comments:

Post a Comment

Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Implement caching in a Spring Boot microservice using Redis
    In this article we will explore how to use Redis as a data cache for a Spring Boot microservice using PostgreSQL as the database. Idea is to...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Spring Boot with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Getting started with Kafka in Python
    This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example. What is Kafka? ...
  • Getting started in GraphQL with Spring Boot
    In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support. ...

Copyright © StackStalk