StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Wednesday, March 2, 2022

Introduction to Kafka Streams in Java

 March 02, 2022     Java     No comments   

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. In this article, we will explore the concepts, architecture and examples on Kafka Streams in Java.

Kafka Streams

  • Kafka Streams is the easiest way to write real time applications and microservices.
  • Kafka Streams is a client library and makes it easy to do data processing and transformations within Kafka.
  • Kafka Streams allows to write standard Java and Scala applications without the need of creating any processing clusters.
  • Kafka Streams API is a library (JAR) and it becomes part of application. There is no special code to deploy and doesn't run on Kafka brokers.
  • Kafka Streams is deployment agnostic and can be deployed on containers, VMs, Cloud, On-Prem etc.
  • Kafka Streams API supports per record (no batching) stream processing with millisecond latency.
  • Kafka Streams is fully aligned with Kafka security mechanism.

Kafka Streams Terminology

Key terminology around Kafka Streams.
  • Stream Stream is an unbounded, continuous realtime flow of records
  • Records Records are key value pairs
  • Topology Topology defines the stream processing computational logic of an application. It defines how input data is transformed to output data.
  • Stream Processor Stream processor is a node in the processor topology and transforms incoming streams, record by record. It may create a output stream.
  • Source Processor Source processor is a special type of stream processor that does not have any upstream processors. It takes data directly from one or more Kafka topics and forward to downstream processors. It doesn't transform data.
  • Sink Processor Sink processor is a special type of stream processor and doesn't have any children and sends the data directly to Kafka topics

Kafka Streams Example - Game Scoring App

In this example, we will use Kafka Streams to aggregate in real time the gaming scores pushed over in a Kafka topic. We have an input topic where the raw scores are published, which gets processed by the streams app to generate aggregate scores on an output topic.

Kafka Streams Setup

Start the Zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
Start the Kafka server.
bin/kafka-server-start.sh config/server.properties
Create the input and output Kafka topics.
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-input --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-output --bootstrap-server localhost:9092
Validate the Kafka topics being created.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
scores-input
scores-output
Start a Kafka console consumer to see the results.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     
                              --topic scores-output     
                              --from-beginning     
                              --formatter kafka.tools.DefaultMessageFormatter     
                              --property print.key=true     
                              --property print.value=true     
                              --skip-message-on-error     
                              --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer     
                              --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Start a Kafka console producer to input the scores. We are using ":" as the seperator between key and value.
bin/kafka-console-producer.sh --broker-list localhost:9092 
                              --topic scores-input 
                              --property "parse.key=true" 
                              --property "key.separator=:"

Game Scoring App

This is the high flow of the application code.
  • Create a properties object to hold the configuration of Kafka Streams instance
  • Create a KStream to read the input topic, "scores-input" which will receive the input records
  • Do an aggregation on the KStream to get KTable. Here we group by key and add the new value. Aggregate the values of records in this stream by the grouped key.
  • Materialize the stream to a output topic
  • Create and start the Kafka Streams instance
package com.stackstalk.kafkastreams;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

public class GameScoringApp {

	public static void main(String[] args) {
		
		// Configuration for a Kafka Streams instance
		Properties config = new Properties();
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, "game-scoring");
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
		// Create a KStream to read  the specific topic
		StreamsBuilder builder = new StreamsBuilder();
		KStream<String, String> scoresInput = builder.stream("scores-input");
		
		// Do an aggregation on the KStream to get KTable
		// Here we group by key and add the new value
		// Aggregate the values of records in this stream by the grouped key. 
		// Records with null key or value are ignored. 
		// The result is written into a local KeyValueStore 
		KTable<String, Long> scoresOutput = scoresInput
				.groupByKey()
				.aggregate(() -> 0L, (key, value, total) -> total + Long.parseLong(value),
				Materialized.with(Serdes.String(), Serdes.Long()));
		
		// Materialize the stream to a output topic
		scoresOutput.toStream()
		.peek((key, value) -> System.out.println("Key:Value = " + key + ":" + value))
		.to("scores-output");
		
		// Print the topology
		Topology topology = builder.build();
		System.out.println(topology.describe());
		
		// Create the Kafka Streams instance
		KafkaStreams kafkaStreams = new KafkaStreams(topology, config);
		
		// Clean local state prior to starting the processing topology
		// In production we need to cleanup only on certain conditions
		kafkaStreams.cleanUp();
		
		// Start the KafkaStreams instance
		kafkaStreams.start();

		// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
		Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
	}
}

Kafka Streams Topology output

This is the topology output from this example. Observe the source, stream and sink processors.
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [scores-input-1])
      --> KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
      --> KTABLE-TOSTREAM-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
      --> KSTREAM-PEEK-0000000004
      <-- KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KTABLE-TOSTREAM-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: scores-output)
      <-- KSTREAM-PEEK-0000000004

Kafka Console Producer input

In the Kafka Console producer send the records (scores) as key:value (name:score) pairs.
>john:5
>tony:3
>john:2
>ram:4
>tony:1

Kafka Console Consumer output

In the Kafka Console consumer output observe the aggregation in action. Everytime a new record is pushed, based on the key the scores get aggregated.
john	5
tony	3
john	7
ram	4
tony	4
Get the full source code of the app in GitHub.

Exactly once Semantics

Kafka Streams implements the Exacly Once Semantics. Exactly once is the guarantee that data processing on each message will happen exactly once. Pushing back the messages on to Kafka will also happen once. The prodcuers are idempotent, if the same message is sent multiple times Kafka will ensure only one copy the message is kept. If a message needs to be written to multiple topics as part of a transaction, Kafka will ensure either all messages are written or none of them are written.

The exactly-once guarantees provided by Kafka’s Streams API are the strongest guarantees offered by any stream processing system so far. It offers end-to-end exactly-once guarantees for a stream processing application that extends from the data read from Kafka, any state materialized to Kafka by the Streams app, to the final output written back to Kafka.

Enabling exactly once in Kafka Streams is an additional property to be specified in the streams configuration. This provides the exactly once processing guarantee.
// Configuration for a Kafka Streams instance
Properties config = new Properties();
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

KStreams, KTables and GlobalKTable

The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges). We can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.

Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Kafka Streams DSL supports built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable.

KStream is an abstraction of a record stream. Using the table analogy, data records in a record stream are always interpreted as an “INSERT”. This is similar to a log, infinite, unbounded data streams.

KTable is an abstraction of a changelog stream, where each data record represents an update. The value in a data record is interpreted as an “UPDATE” of the last value for the same record key. Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update. Main difference between KTable and GlobalKTable. Let us assume a topic with 5 partitions.
  • If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

Stream Operations

We will review some of the stream operations with examples.

Map

Takes one record and produces one record, applicable only for KStreams. Affects both keys and values. In this example, we convert both key and value to lower case.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.map((key, value) -> KeyValue.pair(key.toLowerCase(), value.toLowerCase())); 	
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

MapValues

Takes one record and produces one record, applicable for KStreams and KTables. It affects only the values. In this example, we increment the value with a weight.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, Integer> mappedInput = scoresInput.mapValues(value -> Integer.parseInt(value) * 100); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.Integer()));

Filter/ FilterNot

Takes one record and produces zero or one record. FilterNot is the inverse of Filter. In this example, we filter the keys based on a regular expression.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");			
KStream<String, String> mappedInput = scoresInput.filter((key, value) -> key.matches("^[a-zA-Z].*")); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

FlatMap

Takes one record and produces zero, one or more records, applicable for KStreams. In this example, we split the key based on a seperator and create multiple KeyValue pairs.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMap((key, value) -> {
	List<KeyValue<String, String>> e = new ArrayList<>();
	String[] keyList = key.split(",");
	for ( String e1: keyList )
		e.add(KeyValue.pair(e1, value));
		return e;		
});

FlatMapValues

Takes one record and produces zero, one or more records, applicable for KStreams. Does not change the keys.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMapValues(value -> Arrays.asList(value.split(",")));

Branch

Split a KStream based on one or more predicates. Result is multiple KStreams. In this example, we split the stream into 3 stream based on the value of the score.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");

BranchedKStream<String, String> branches = scoresInput.split(Named.as("split-"))
	.branch((key, value) -> Integer.parseInt(value) > 100, Branched.as("hundred-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 50, Branched.as("fifty-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 10, Branched.as("ten-plus"));
		
branches.defaultBranch().get("split-hundred-plus").to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

SelectKey

Assigns a new key to the record. In this example, we prepend a tag to the key.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input-1");
KStream<String, String> mappedInput = scoresInput.selectKey((key, value) -> "MYTAG_" + key);
		
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
Refer to the Game Scoring App example above, for examples on stateful operations like GroupBy and Aggregate.

Conclusion

Kafka Streams is the easiest way to write real time applications and microservices in Java. With Kafka Streams DSL high-level API, the most common data transformation operations such as map, filter, join, and aggregations out of the box.
Email ThisBlogThis!Share to XShare to Facebook
Newer Post Older Post Home

0 comments:

Post a Comment

Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Python FastAPI file upload and download
    In this article, we will look at an example of how to implement a file upload and download API in a Python FastAPI microservice. Example bel...
  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Accessing the Kubernetes API
    In this article, we will explore the steps required to access the Kubernetes API and overcome common challenges. All operations and communic...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Scheduling jobs in Python
    When developing applications and microservices we run into scenarios where there is a need to run scheduled tasks. Examples include performi...
  • Using Tekton to deploy KNative services
    Tekton is a popular open-source framework for building continuous delivery pipelines. Tekton provides a declarative way to define pipelines ...

Copyright © StackStalk