Kafka Streams
- Kafka Streams is the easiest way to write real time applications and microservices.
- Kafka Streams is a client library and makes it easy to do data processing and transformations within Kafka.
- Kafka Streams allows to write standard Java and Scala applications without the need of creating any processing clusters.
- Kafka Streams API is a library (JAR) and it becomes part of application. There is no special code to deploy and doesn't run on Kafka brokers.
- Kafka Streams is deployment agnostic and can be deployed on containers, VMs, Cloud, On-Prem etc.
- Kafka Streams API supports per record (no batching) stream processing with millisecond latency.
- Kafka Streams is fully aligned with Kafka security mechanism.
Kafka Streams Terminology
- Stream Stream is an unbounded, continuous realtime flow of records
- Records Records are key value pairs
- Topology Topology defines the stream processing computational logic of an application. It defines how input data is transformed to output data.
- Stream Processor Stream processor is a node in the processor topology and transforms incoming streams, record by record. It may create a output stream.
- Source Processor Source processor is a special type of stream processor that does not have any upstream processors. It takes data directly from one or more Kafka topics and forward to downstream processors. It doesn't transform data.
- Sink Processor Sink processor is a special type of stream processor and doesn't have any children and sends the data directly to Kafka topics
Kafka Streams Example - Game Scoring App
Kafka Streams Setup
bin/zookeeper-server-start.sh config/zookeeper.propertiesStart the Kafka server.
bin/kafka-server-start.sh config/server.propertiesCreate the input and output Kafka topics.
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-input --bootstrap-server localhost:9092 bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-output --bootstrap-server localhost:9092Validate the Kafka topics being created.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list scores-input scores-outputStart a Kafka console consumer to see the results.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic scores-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --skip-message-on-error --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializerStart a Kafka console producer to input the scores. We are using ":" as the seperator between key and value.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scores-input --property "parse.key=true" --property "key.separator=:"
Game Scoring App
- Create a properties object to hold the configuration of Kafka Streams instance
- Create a KStream to read the input topic, "scores-input" which will receive the input records
- Do an aggregation on the KStream to get KTable. Here we group by key and add the new value. Aggregate the values of records in this stream by the grouped key.
- Materialize the stream to a output topic
- Create and start the Kafka Streams instance
package com.stackstalk.kafkastreams; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; public class GameScoringApp { public static void main(String[] args) { // Configuration for a Kafka Streams instance Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "game-scoring"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // Create a KStream to read the specific topic StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); // Do an aggregation on the KStream to get KTable // Here we group by key and add the new value // Aggregate the values of records in this stream by the grouped key. // Records with null key or value are ignored. // The result is written into a local KeyValueStore KTable<String, Long> scoresOutput = scoresInput .groupByKey() .aggregate(() -> 0L, (key, value, total) -> total + Long.parseLong(value), Materialized.with(Serdes.String(), Serdes.Long())); // Materialize the stream to a output topic scoresOutput.toStream() .peek((key, value) -> System.out.println("Key:Value = " + key + ":" + value)) .to("scores-output"); // Print the topology Topology topology = builder.build(); System.out.println(topology.describe()); // Create the Kafka Streams instance KafkaStreams kafkaStreams = new KafkaStreams(topology, config); // Clean local state prior to starting the processing topology // In production we need to cleanup only on certain conditions kafkaStreams.cleanUp(); // Start the KafkaStreams instance kafkaStreams.start(); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } }
Kafka Streams Topology output
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [scores-input-1]) --> KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001]) --> KTABLE-TOSTREAM-0000000003 <-- KSTREAM-SOURCE-0000000000 Processor: KTABLE-TOSTREAM-0000000003 (stores: []) --> KSTREAM-PEEK-0000000004 <-- KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-PEEK-0000000004 (stores: []) --> KSTREAM-SINK-0000000005 <-- KTABLE-TOSTREAM-0000000003 Sink: KSTREAM-SINK-0000000005 (topic: scores-output) <-- KSTREAM-PEEK-0000000004
Kafka Console Producer input
>john:5 >tony:3 >john:2 >ram:4 >tony:1
Kafka Console Consumer output
john 5 tony 3 john 7 ram 4 tony 4Get the full source code of the app in GitHub.
Exactly once Semantics
The exactly-once guarantees provided by Kafka’s Streams API are the strongest guarantees offered by any stream processing system so far. It offers end-to-end exactly-once guarantees for a stream processing application that extends from the data read from Kafka, any state materialized to Kafka by the Streams app, to the final output written back to Kafka.
Enabling exactly once in Kafka Streams is an additional property to be specified in the streams configuration. This provides the exactly once processing guarantee.
// Configuration for a Kafka Streams instance Properties config = new Properties(); config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
KStreams, KTables and GlobalKTable
Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.
Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).
Kafka Streams DSL supports built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable.
KStream is an abstraction of a record stream. Using the table analogy, data records in a record stream are always interpreted as an “INSERT”. This is similar to a log, infinite, unbounded data streams.
KTable is an abstraction of a changelog stream, where each data record represents an update. The value in a data record is interpreted as an “UPDATE” of the last value for the same record key. Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.
Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update. Main difference between KTable and GlobalKTable. Let us assume a topic with 5 partitions.
- If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
- If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.
Stream Operations
Map
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); KStream<String, String> mappedInput = scoresInput.map((key, value) -> KeyValue.pair(key.toLowerCase(), value.toLowerCase())); mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
MapValues
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); KStream<String, Integer> mappedInput = scoresInput.mapValues(value -> Integer.parseInt(value) * 100); mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.Integer()));
Filter/ FilterNot
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); KStream<String, String> mappedInput = scoresInput.filter((key, value) -> key.matches("^[a-zA-Z].*")); mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
FlatMap
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); KStream<String, String> mappedInput = scoresInput.flatMap((key, value) -> { List<KeyValue<String, String>> e = new ArrayList<>(); String[] keyList = key.split(","); for ( String e1: keyList ) e.add(KeyValue.pair(e1, value)); return e; });
FlatMapValues
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); KStream<String, String> mappedInput = scoresInput.flatMapValues(value -> Arrays.asList(value.split(",")));
Branch
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input"); BranchedKStream<String, String> branches = scoresInput.split(Named.as("split-")) .branch((key, value) -> Integer.parseInt(value) > 100, Branched.as("hundred-plus")) .branch((key, value) -> Integer.parseInt(value) > 50, Branched.as("fifty-plus")) .branch((key, value) -> Integer.parseInt(value) > 10, Branched.as("ten-plus")); branches.defaultBranch().get("split-hundred-plus").to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
SelectKey
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> scoresInput = builder.stream("scores-input-1"); KStream<String, String> mappedInput = scoresInput.selectKey((key, value) -> "MYTAG_" + key); mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));Refer to the Game Scoring App example above, for examples on stateful operations like GroupBy and Aggregate.
0 comments:
Post a Comment