StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Friday, June 18, 2021

Getting started with Kafka in Python

 June 18, 2021     Python     No comments   

This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example.

What is Kafka?


Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.




Kafka Concepts


This section highlights key foundational concepts around Kafka.

Topic

  • In Kafka messages are organised into topics
  • Topic refers to the name used to identify a stream of data
  • To send a message it is written to a topic. Messages are read from the topic
  • There is no limit on the number of topics in Kafka and it can typically get large

Partition

  • In Kafka to parallelise a topic it can be divided into number partitions
  • Partition is an sequence of records and new messages are appended to the partition
  • Records in partition are assigned a sequence id and called the offset

Brokers

  • Kafka is typically run as a cluster with one or more servers each of which is referred as broker
  • Kafka broker, Kafka node, Kafka server typically mean the same
  • Kafka brokers form a cluster by sharing information between them

Replication

  • In Kafka, replication refer to the process of having multiple copies of data for availability
  • Replication happens at the partition level
  • Replication factor of a topic defines the number of copies to be maintained
  • Kafka assigns a replica as leader. All other replicas are followers (in-sync replicas) for the leader  

Producer

  • Producer creates and publishes messages to one or more topics
  • Producer can optionally select the partition that stores the data

Consumer

  • Consumer reads and processed messages from one or more topics to perform the intended business logic
  • In large scale systems there could be difference between the current message being processed by the consumer and the newest message arriving on a partition which is referred as offset lag.

Consumer Groups

  • Consumer group is one or more consumers working together to process messages. 
  • Consumers can join a group by specifying the same group identifier.
  • Kafka guarantees messages from a single partition are processed by one only consumer in the group. 
  • Parallelism to process the messages on a topic is defined by the number of consumers in a group.
  • If there are more partitions for a topic than number of consumers in the group, some consumers will be assigned more than one partition.
  • If there are less partitions for a topic than number of consumers in the group, some consumers will get no data.

Kafka Use Cases

Few use cases to understand how Kafka is being in real world use cases.


Messaging

Kafka could be used as replacement for more traditional message broker systems like RabbitMQ. Messaging is typically used to decouple data producers and data processing consumers.  Kafka makes a a good solution for large scale message processing applications. 


Real time data analytics

Real time data analytics use cases - examples live traffic data, inventory management for better consumer experience, medical diagnostics data, compute performance metrics etc. require high throughput, low latency, performance guarantees to deliver the real time experience. Kafka stream processing plays a key role.


Log Aggregation

Log aggregation typically collects log files of servers and micro services and puts them in a central place for processing. An example use case of log aggregation could be to analyse call flows in a distributed system. Kafka abstracts the details of files and provides a stream of messages which allows for low latency processing.

Setting up Kafka

Follow these instructions to get started with Kafka.
  1. Download the latest version of Kafka
  2. wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
  3. Extract the download file
  4. tar -xzf kafka_2.13-2.8.0.tgz
    cd kafka_2.13-2.8.0
  5. Start the Zookeeper service
  6. bin/zookeeper-server-start.sh config/zookeeper.properties
  7. Start the Kafka broker service
  8. bin/kafka-server-start.sh config/server.properties
We now have a working environment for Kafka.


Confluent Kafka for Python

Confluent Kafka is a reliable, performant and feature-rich Python client for Apache Kafka v0.8 and above. Follow these instructions to get started with confluent kafka.
  1. Create and activate a Python virtual environment
  2. python3 -m venv kafka-demo
    cd kafka-demo
    source activate
    
  3. Install confluent kafka module
  4. pip install confluent-kafka

We now have a working environment for Kafka with Python.


Python producer and consumer example using Confluent Kafka

Now let us write a simple producer and consumer to demonstrate the concepts discussed above. It is pretty simple with confluent kafka and is only few lines of code.

Let us focus on the producer first. This is an example of synchronous producer. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. We use the flush call to make the writes synchronous and to ensure the messages are delivered before the producer is shutdown.
from confluent_kafka import Producer, KafkaException


try:
    producer = Producer({'bootstrap.servers': "localhost:9092"})
    producer.produce("topic_1", "This is my test message")
    producer.flush()

except KafkaException as e:
    print("Kafka failure " + e)
Synchronous writes limit the throughput and the recommendation is to use asynchronous writes. Let us look at an example. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. If there is a need to receive notification of delivery success or failure a callback can be associated. No delivery notification events are propagated until the poll() method is invoked.
from confluent_kafka import Producer, KafkaException


def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
    else:
        print("Message produced: %s" % msg.value().decode('utf-8'))


try:
    producer = Producer({'bootstrap.servers': "localhost:9092"})
    producer.produce("topic_1", "This is my async test message", callback=acked)
    producer.poll(1)

except KafkaException as e:
    print('Kafka failure ' + e)
Now let us write a simple consumer. We specify the Kafka server and the consumer group as the configuration to create the consumer. We subscribe to the topic of interest before going into a consume loop. In the consume loop we call the poll() method to retrieve the messages from the topic. If there are no messages then poll() method return None.
from confluent_kafka import Consumer

consumer = Consumer({'bootstrap.servers': "localhost:9092",
                     'group.id': "mygroup"})

try:
    consumer.subscribe(["topic_1"])

    while True:

        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        msg_val = msg.value().decode('utf-8')
        print(msg_val)
finally:
    consumer.close()
Start the consumer application in the virtual environment created earlier.
(kafka-demo) localhost> python consumer.py 
This is my async test message
This is my async test message
When you run the producer application we should see the messages being received by the consumer.
localhost >python producer.py 
Message produced: This is my async test message
Get access to the source code of this example in GitHub.
  • Share This:  
Newer Post Older Post Home

0 comments:

Post a Comment

Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Implement caching in a Spring Boot microservice using Redis
    In this article we will explore how to use Redis as a data cache for a Spring Boot microservice using PostgreSQL as the database. Idea is to...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Spring Boot with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Getting started with Kafka in Python
    This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example. What is Kafka? ...
  • Getting started in GraphQL with Spring Boot
    In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support. ...

Copyright © StackStalk