StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Friday, March 10, 2023

Going Serverless with KNative

 March 10, 2023     Kubernetes     No comments   

Serverless computing has rapidly gained popularity in recent years as it provides a platform for developers to deploy their applications without worrying about infrastructure management. KNative is an open-source platform that simplifies the deployment and management of serverless workloads on Kubernetes, allowing developers to focus on building and scaling their applications. In this article, we will take a closer look at KNative and its benefits.
  • What is KNative?
  • The Architecture of KNative
  • Install KNative on Docker Desktop
  • KNative serving example in Java with Quarkus
  • Conclusion

What is KNative?

KNative is a set of components that extend Kubernetes to provide a platform for building, deploying, and managing serverless applications. It provides a range of features, such as auto-scaling, event-driven processing, and routing, which simplify the deployment and management of serverless workloads. KNative is built on top of Kubernetes, making it easy to integrate with existing Kubernetes clusters.

Benefits of KNative

  1. Simplified Deployment: KNative simplifies the deployment of serverless applications by abstracting away the underlying infrastructure. Developers can focus on writing code and let KNative handle the deployment and management of their application.
  2. Autoscaling: KNative provides auto-scaling capabilities that allow developers to automatically scale their applications up or down based on demand. This ensures that applications are always available, regardless of the number of users accessing them.
  3. Event-Driven Processing: KNative supports event-driven processing, allowing developers to create functions that respond to events triggered by external systems. This makes it easy to create serverless applications that react to real-time events.
  4. Language-Agnostic: KNative supports a range of programming languages, making it easy for developers to write code in their language of choice.
  5. Open-Source: KNative is an open-source platform, which means that developers can contribute to its development and customize it to meet their specific requirements.

Use Cases of KNative

  1. Serverless Applications: KNative is ideal for building serverless applications that can be scaled up or down based on demand.
  2. Event-Driven Applications: KNative's event-driven processing capabilities make it ideal for building real-time applications that respond to events triggered by external systems.
  3. Microservices: KNative can be used to build and deploy microservices, allowing developers to break down their applications into smaller, more manageable components.

The Architecture of KNative

The architecture of KNative can be divided into three main components: Serving, Eventing, and Build. Each component provides a set of functionalities that simplify the deployment and management of serverless workloads.

Serving

The Serving component of KNative provides a platform for deploying and managing serverless applications. It allows developers to deploy containerized applications and functions, automatically scaling them based on demand. Serving is built on top of Kubernetes and uses Istio for traffic management and security. Serving consists of the following components:
  • Knative Serving: The core component of KNative Serving, responsible for deploying and managing serverless applications.
  • Knative Istio Controller: A component that manages the Istio resources required for routing and traffic management.
  • Activator: A component that activates containers on demand and routes traffic to them.

Eventing

The Eventing component of KNative provides a platform for building event-driven applications. It allows developers to create functions that respond to events triggered by external systems, such as message queues or databases. Eventing is built on top of Kubernetes and uses Apache Kafka for event streaming. Eventing consists of the following components:
  • Knative Eventing: The core component of KNative Eventing, responsible for managing event sources and subscriptions.
  • Apache Kafka: A distributed event streaming platform used by KNative Eventing for event processing.
  • Knative Eventing Sources: A set of components that connect to external event sources and generate events.
  • Knative Eventing Channels: A set of components that provide a platform for routing events between event sources and event sinks.

Build

The Build component of KNative provides a platform for building container images from source code. It allows developers to build and package container images using their preferred build tools and then deploy them to Kubernetes. Build is built on top of Kubernetes and uses Tekton for building container images. Build consists of the following components:
  • Knative Build: The core component of KNative Build, responsible for managing build templates and pipelines.
  • Tekton: A Kubernetes-native framework for building and deploying container images.
  • Kaniko: A tool used by KNative Build for building container images from source code.

Install KNative on Docker Desktop

Installing KNative on Docker Desktop with Kourier is a straightforward process that involves a few steps. In this guide, we will walk you through the steps to install KNative on Docker Desktop with Kourier.

Requirements

Before you begin, ensure that you have the following:
  • Docker Desktop installed on your machine
  • Kubernetes enabled in Docker Desktop
  • kubectl command-line tool installed
  • kn command-line tool installed
  • helm package manager installed
Step 1: Install KNative Serving The first step is to install KNative Serving using the helm package manager. Run the following command to add the KNative serving chart repository:
helm repo add knative https://knative.dev/helm-charts 
Then, run the following command to install KNative Serving:
kubectl create namespace knative-serving 
helm install knative-serving knative/serving --namespace knative-serving 
Verify that KNative Serving is running by running the following command:
kubectl get pods --namespace knative-serving 
Step 2: Install Kourier Kourier is a lightweight ingress and egress controller for KNative. It provides a simple way to route traffic to and from KNative services. Run the following command to add the Kourier chart repository:
helm repo add kourier https://storage.googleapis.com/kourier-release 
Then, run the following command to install Kourier:
kubectl create namespace kourier-system 
helm install kourier kourier/kourier --namespace kourier-system --set service.type=NodePort --set service.nodePorts.http=31080 --set service.nodePorts.https=31443 
Verify that Kourier is running by running the following command:
kubectl get pods --namespace kourier-system 
Step 3: Verify the Installation To verify that KNative and Kourier are running correctly, create a sample KNative service and expose it using Kourier. Create a sample KNative service by running the following command:
kubectl apply -f https://knative.dev/docs/serving/samples/hello-world/helloworld-go.yaml 
Expose the service using Kourier by running the following command:
kubectl apply -f https://raw.githubusercontent.com/knative/net-kourier/main/config/ingress/contour/01-crds.yaml 
kubectl apply -f https://raw.githubusercontent.com/knative/net-kourier/main/config/ingress/contour/02-default-backend.yaml 
kubectl apply -f https://raw.githubusercontent.com/knative/net-kourier/main/config/ingress/contour/03-kourier.yaml 
kubectl apply -f https://raw.githubusercontent.com/knative/net-kourier/main/config/ingress/contour/04-namespace.yaml 
kubectl apply -f https://raw.githubusercontent.com/knative/net-kourier/main/config/ingress/contour/05-example.yaml 
To access the sample service, get the IP address of the Docker Desktop Kubernetes cluster by running the following command:
kubectl cluster-info | grep 'Kubernetes control plane' | awk '/http/ {print $NF}' | sed 's/.*\/\/\([^:]*\):.*/\1/' 
Then, open a web browser and navigate to the following URL, replacing IP_ADDRESS with the IP address of the Kubernetes cluster:
http://IP_ADDRESS/hello-world 
If everything is set up correctly, you should see a message that says "Hello World!".

KNative serving example in Java with Quarkus

Create a new Quarkus project using the following command:
mvn io.quarkus.platform:quarkus-maven-plugin:2.16.4.Final:create \
    -DprojectGroupId=com.example \
    -DprojectArtifactId=knative-example \
    -DclassName="com.example.MyKnativeService" \
    -Dextensions="resteasy-jsonb,kubernetes,container-image-docker"

This will create a new Quarkus project with the necessary extensions for Knative serving and JSON serialization using RESTEasy and JSON-B.

Next, add the following properies to the application.properties file.

quarkus.container-image.build=true
quarkus.container-image.group=dev.local
quarkus.container-image.push=false
quarkus.container-image.builder=docker
quarkus.knative.image-pull-policy=never
quarkus.kubernetes.deployment-target=knative
quarkus.kubernetes-client.namespace=mynamespace
This creates a Java class. Customize as needed.
package com.example;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class MyKnativeService {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "Hello, Knative";
    }
}

This class defines a simple REST endpoint that returns the message "Hello, Knative!".

KNative deployment specifications is generated in target/kubernetes/knative.yml". Finally, deploy your Quarkus application to Knative serving using the following command:

./mvnw clean package -Dquarkus.kubernetes.deploy=true 
This will build and package your Quarkus application and deploy it to Knative serving. Check if service is installed.
kn service list -n mynamespace
NAME              URL                                                   LATEST                  AGE     CONDITIONS   READY   REASON
knative-example   http://knative-example.mynamespace.127.0.0.1.nip.io   knative-example-00001   6m15s   3 OK / 3     True
You can access your Knative service using the URL provided by Knative serving. To access from host machine we would need to do port forwarding.
kubectl port-forward --namespace kourier-system $(kubectl get pod -n kourier-system -l "app=3scale-kourier-gateway" --field-selector=status.phase=Running --output=jsonpath="{.items[0].metadata.name}") 8080:8080
Sample curl command to access from host machine.
curl --location --request GET 'http://localhost:8080/hello' \
--header 'Host: knative-example.mynamespace.127.0.0.1.nip.io'
That's it! You now have a Quarkus application running on Knative serving. You can modify the MyKnativeService class to add additional REST endpoints and functionality as needed. We will explore KNative eventing example in the next article.

Conclusion

KNative is an open-source platform that simplifies the deployment and management of serverless applications on Kubernetes. With its auto-scaling, event-driven processing, and language-agnostic capabilities, KNative has become a popular choice for building and scaling cloud-native applications. Its flexible architecture and open-source nature make it a powerful tool for building custom solutions that meet specific business requirements. If you're looking for a platform to build and deploy serverless applications, KNative is definitely worth exploring.
Read More
  • Share This:  

Sunday, February 5, 2023

Python FastAPI file upload and download

 February 05, 2023     Python     No comments   

In this article, we will look at an example of how to implement a file upload and download API in a Python FastAPI microservice. Example below provides a simple microservice built with FastAPI which supports API paths "/upload" and "/download" to handle the files.

To run this example need to install these modules.

pip install fastapi
pip install uvicorn
pip install python-multipart
Here uvicorn is an implementation of ASGI (Asynchronous Service Gateway Interface) specifications and provides a standard interface between async-capable Python web servers, frameworks and applications.

Application code is simple. We expose 2 APIs.

  • In the upload API, we create a writable file with .zip extension. Uploaded file contents are read in chunks and appended to the file.
  • In the download API, we access the file based on provided name and return as a FileRespone object.
from fastapi import FastAPI, Request
from fastapi import File, UploadFile
from fastapi.responses import FileResponse

import uvicorn
import os

description = """
Sample app to demonstrate file upload with FastAPI
"""
app = FastAPI(title="FileUploadApp",
              description=description,
              version="0.1")


@app.get("/download")
async def download(name: str):
    file_path = "/app/filecache/" + name + ".zip"
    if os.path.exists(file_path):
       return FileResponse(path=file_path, filename=file_path, media_type='application/zip')

    return {"message": "File not found"}


@app.post("/upload")
async def upload(name: str, file: UploadFile = File(...)):
    try:
        filename = "/app/filecache/" + name + ".zip"
        with open(filename, 'wb') as f:
            while contents := file.file.read(1024 * 1024):
                f.write(contents)
    except Exception as e:
        print(e)
        return {"message": "Error uploading the file"}
    finally:
        file.file.close()

    return {"message": f"Successfully uploaded {file.filename}"}

if __name__ == '__main__':
    uvicorn.run('app:app',
                host='0.0.0.0',
                port=8091,
                reload=True)

To run the application as a container we create a simple Dockerfile. Here we install the python modules and bring up the application on port 8091.

FROM python:3.9-slim-buster

WORKDIR /app

COPY requirements.txt /app/requirements.txt
RUN pip3 install -r /app/requirements.txt

COPY app.py /app
RUN mkdir -p /app/resources
RUN mkdir -p /app/filecache

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8091"]

Now build the docker image using this command.

docker build -t fileupload:1 -f Dockerfile .

We use a simple docker compose file to start the file upload service. Also note that we are mounting a host folder (/tmp/filecache) to the container so that the files are persisted across restarts of the container.

version: '2.1'

services:
  fileupload:
    image: fileupload:1
    ports:
      - "8091:8091"
    volumes:
      - /tmp/filecache:/app/filecache

Now start the service using the docker compose command.

docker-compose -f docker-compose.yml up

Best way to validate the service is to use the Swagger UI of the service using this URL. http://localhost:8091/docs/

Download the full source of this example here.

Read More
  • Share This:  

Friday, September 16, 2022

Accessing the Kubernetes API

 September 16, 2022     Kubernetes     No comments   

In this article, we will explore the steps required to access the Kubernetes API and overcome common challenges. All operations and communications between components, and external user commands are REST API calls that the Kubernetes API Server handles. Consequently, everything in the Kubernetes platform is treated as an API object. 

Reference: Kubernetes API



We will use docker desktop to demonstrate these steps.

Get Kubernetes API server address for the cluster

First step is to get the API server endpoint. Use "kubectl config view" and note down the server endpoint for the specific cluster. Here "https://kubernetes.docker.internal:6443" is the endpoint for Kubernetes API server.
$ kubectl config view
apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: DATA+OMITTED
    server: https://kubernetes.docker.internal:6443
  name: docker-desktop
contexts:
- context:
    cluster: docker-desktop
    user: docker-desktop
  name: docker-desktop
current-context: docker-desktop
kind: Config
preferences: {}
users:
- name: docker-desktop
  user:
    client-certificate-data: REDACTED
    client-key-data: REDACTED

Get the Kubernetes version error

Let us read the Kubernetes version via the API server. We will face the SSL certificate issue.
$ curl https://kubernetes.docker.internal:6443/version
curl: (60) SSL certificate problem: unable to get local issuer certificate
More details here: https://curl.se/docs/sslcerts.html

curl failed to verify the legitimacy of the server and therefore could not
establish a secure connection to it. To learn more about this situation and
how to fix it, please visit the web page mentioned above.

Get the Kubernetes version using SSL certificate

To overcome the SSL certificate problem send the cacert. Typically in docker desktop the ca.crt is available at ~/Library/Containers/com.docker.docker/pki/ca.crt.
$ curl https://kubernetes.docker.internal:6443/version --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt
{
  "major": "1",
  "minor": "25",
  "gitVersion": "v1.25.0",
  "gitCommit": "a866cbe2e5bbaa01cfd5e969aa3e033f3282a8a2",
  "gitTreeState": "clean",
  "buildDate": "2022-08-23T17:38:15Z",
  "goVersion": "go1.19",
  "compiler": "gc",
  "platform": "linux/arm64"
}

Read the POD list error

Now let us try to read the list of pods in default namespace. We will get an authorization error since we don't have permissions yet.
$ curl https://kubernetes.docker.internal:6443/api/v1/namespaces/default/pods --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {},
  "status": "Failure",
  "message": "pods is forbidden: User \"system:anonymous\" cannot list resource \"pods\" in API group \"\" in the namespace \"default\"",
  "reason": "Forbidden",
  "details": {
    "kind": "pods"
  },
  "code": 403
}

Read the POD list using service account token

To overcome the permissions issue, create a secret to hold a token for the default service account.
$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: default-token
  annotations:
    kubernetes.io/service-account.name: default
type: kubernetes.io/service-account-token
EOF
Provide the required RBAC authorization. In this example, we are providing cluster admin role to default service account. More details on RBAC here.
$ kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: default-rbac
subjects:
  - kind: ServiceAccount
    name: default
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
EOF
Now read the default token secret.
$ kubectl describe secret default-token
Name:         default-token
Namespace:    default
Labels:       <none>
Annotations:  kubernetes.io/service-account.name: default
              kubernetes.io/service-account.uid: 8405ff0b-bc0f-425d-8980-7ae289563880

Type:  kubernetes.io/service-account-token

Data
====
ca.crt:     1099 bytes
namespace:  7 bytes
token:      [USE-THIS-TOKEN]
Use the token in the curl command header as the bearer token for authorization.
$ curl https://kubernetes.docker.internal:6443/api/v1/namespaces/default/pods --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt -H "Authorization: Bearer [USE-THIS-TOKEN]"
We are now able to read the POD list without any errors.
Read More
  • Share This:  

Friday, September 2, 2022

Avro Producer and Consumer with Python using Confluent Kafka

 September 02, 2022     Python     No comments   

In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Producer and Consumer with Python using Confluent Kafka. Also refer this article for basic introduction on Getting started with Kafka in Python

Avro Introduction

For streaming data use cases, Avro is a popular data serialization format. Avro is an open source data serialization system that helps with data exchange between systems, programming languages, and processing frameworks. Avro helps define a binary format for your data, as well as map it to the programming language of choice.

Avro relies on schemas. When Avro data is read, the schema is used and when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

Avro has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data.

Let us look at an example to represent the movie data below to Avro schema.

{
  "imdbID": "tt0061722",
  "Title": "The Graduate",
  "Year": 1967,
  "Director": "Mike Nichols",
  "Actors": [
    "Anne Bancroft",
    "Dustin Hoffman",
    "Katharine Ross",
    "William Daniels"
  ],
  "Language": "English",
  "imdbRating": 8.1
}

Movie Avro Schema

Avro schema definitions are JSON records. Because it is a record, it can define multiple fields which are organized in a JSON array. Each such field identifies the field's name as well as its type. The type can be something simple, like an integer, or something complex, like another record.

In the movie schema we have defined the MovieDetails record type with associated fields to represent the movie data.

{
    "type": "record",
    "namespace": "com.stackstalk",
    "name": "MovieDetails",
    "fields": [
        {
            "name": "imdbID",
            "type": "string"
        },
        {
            "name": "Title",
            "type": "string"
        },
        {
            "name": "Year",
            "type": "int"
        },
        {
            "name": "Language",
            "type": "string"
        },
        {
            "name": "Director",
            "type": "string"
        },
        {
            "name": "Actors",
            "type": {
              "type": "array",
              "items": {
                 "type": "string"
              }
            }
        },
        {
            "name": "imdbRating",
            "type": "float"
        }
    ]
}

Confluent Platform

Reference: Confluent Platform

Confluent Platform is an enterprise grade distribution of Apache Kafka. It is a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. Each release of Confluent Platform includes the latest release of Kafka and additional tools and services that make it easier to build and manage an Event Streaming Platform.

For this example we will leverage confluent services zookeeper, broker, schema registry and control center as docker containers. Refer Confluent docker compose specs

Confluent Schema Registry enables safe, zero downtime evolution of schemas by centralizing the schema management. It provides a RESTful interface for storing and retrieving Avro, JSON Schema, and Protobuf schemas. Schema Registry tracks all versions of schemas used for every topic in Kafka and only allows evolution of schemas according to user-defined compatibility settings. This gives developers confidence that they can safely modify schemas as necessary without worrying that doing so will break a different service they may not even be aware of. Support for schemas is a foundational component of a data governance solution.

Confluent Control Center is a GUI-based system for managing and monitoring Kafka. It allows you to easily manage Kafka Connect, to create, edit, and manage connections to other systems. It also allows you to monitor data streams from producer to consumer, assuring that every message is delivered, and measuring how long it takes to deliver messages.

Python producer and consumer example using Confluent Kafka

Now let us write a simple producer and consumer to demonstrate the concepts discussed above. It is pretty simple with confluent kafka.

Let us focus on the producer first. This is an example of synchronous producer. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. We load the defined key and value Avro schema files and associate them during the Avro producer creation. We use the flush call to make the writes synchronous and to ensure the messages are delivered before the producer is shutdown.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
    else:
        print("Message produced: %s" % msg.value().decode('utf-8'))

def load_avro_schema_from_file():
    key_schema = avro.load("avro/movie-topic-key.avsc")
    value_schema = avro.load("avro/movie-topic-value.avsc")

    return key_schema, value_schema

def send_data():
    producer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
    }

    key_schema, value_schema = load_avro_schema_from_file()

    #print(key_schema)
    #print(value_schema)

    try:
        producer = AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)

        f1 = open("data/movie-metadata.json", "r")
        key_str = f1.read();
        f1.close()

        f2 = open("data/movie-details.json", "r")
        value_str = f2.read()
        f2.close()

        producer.produce(topic = "movie-topic", key = json.loads(key_str), headers = [("my-header1", "Value1")], value = json.loads(value_str))
        producer.flush()

    except KafkaException as e:
        print('Kafka failure ' + e)

def main():
    send_data()

if __name__ == "__main__":
    main()

Now let us write a simple consumer. We specify the Kafka server, schema registry and the consumer group as the configuration to create the Avro consumer. We subscribe to the topic of interest before going into a consume loop. In the consume loop we call the poll() method to retrieve the messages from the topic. If there are no messages then poll() method return None.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro

def read_data():
    consumer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081",
        "group.id": "my-connsumer1",
        "auto.offset.reset": "earliest"
    }

    #print(key_schema)
    #print(value_schema)

    consumer = AvroConsumer(consumer_config)
    consumer.subscribe(['movie-topic'])

    while True:
      try:
        msg = consumer.poll(1)

        if msg is None:
          continue

        print("Key is :" + json.dumps(msg.key()))
        print("Value is :" + json.dumps(msg.value()))
        print("-------------------------")

      except KafkaException as e:
        print('Kafka failure ' + e)

    consumer.close()

def main():
    read_data()

if __name__ == "__main__":
    main()

Run the producer code.

$ python3 producer.py

When you run the producer application we should see the messages being received by the consumer.

$ python3 consumer.py 
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------

Open the confluent control center http://localhost:9021/ to view the defined schema registered on movie topic.


Get access to the source code of this example in GitHub.

Read More
  • Share This:  

Monday, March 14, 2022

Monitor Spring Boot App with Micrometer and Prometheus

 March 14, 2022     Java, Spring Boot     No comments   

Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, metrics, logging etc. are key to run the applications in production. In this article, we will explore the concepts around Spring Actuator, Micrometer and Prometheus to Monitor Spring Boot applications.

Spring Boot Actuator

Reference: Spring Production-ready Features

Spring Boot includes a number of features to help monitor and manage applications in production via HTTP endpoints or JMX. Auditing, health, and metrics gathering can be automatically applied to applications.

The spring-boot-actuator module provides all of Spring Boot’s production-ready features. The recommended way to enable the features is to add a dependency on the spring-boot-starter-actuator “Starter”.

To add the actuator to a Maven-based project, add the following ‘Starter’ dependency to pom.xml.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

Actuator endpoints let you monitor and interact with your application. Spring Boot includes a number of built-in endpoints. For example, the health endpoint provides basic application health information. Most applications choose exposure over HTTP, where the ID of the endpoint and a prefix of /actuator is mapped to a URL. For example, by default, the health endpoint is mapped to /actuator/health.

An endpoint is considered to be available when it is both enabled and exposed. To configure the enablement of an endpoint, use its management.endpoint.<id>.enabled property in application.properties. By default, all endpoints except for shutdown are enabled.

management.endpoint.info.enabled=true

Since Endpoints may contain sensitive information, you should carefully consider when to expose them. For example, to stop exposing all endpoints over HTTP and only expose the health, metrics and loggers endpoints, use the following property in application.properties:

management.endpoints.web.exposure.include=health,metrics,loggers
Let us look at few examples to understand the Spring Boot actuator.

Configure log level of Spring Boot app at runtime

To change the log level of a Spring Boot application during run time, we can use the endpoint "/actuator/loggers". This is useful when you want to collect more detailed logs to debug production issues for a specific period of time.

To change the log level of root logger:

curl --location --request POST 'http://localhost:8080/actuator/loggers/ROOT' \
--header 'Content-Type: application/json' \
--data-raw '{
    "configuredLevel": "DEBUG"
}'

To view the list of all loggers and current configuration in use:

curl -X GET http://localhost:8080/actuator/loggers

Query the Spring Boot app metrics

To query the metrics of application use the endpoint "/actuator/metrics". For example, to get the process cpu usage:
curl -X GET http://localhost:8080/actuator/metrics/process.cpu.usage
Returns the process.cpu.usage metric.
{
    "name": "process.cpu.usage",
    "description": "The \"recent cpu usage\" for the Java Virtual Machine process",
    "baseUnit": null,
    "measurements": [
        {
            "statistic": "VALUE",
            "value": 0.0
        }
    ],
    "availableTags": []
}
To see list of all available metrics use:
curl -X GET http://localhost:8080/actuator/metrics

Query the Spring Boot app health information

To query the health status of application use the endpoint "/actuator/health".
curl -X GET http://localhost:8080/actuator/health
Returns the status of the application.
{"status":"UP"}

Introduction to Micrometer

Reference: Micrometer

Micrometer is a metrics instrumentation library powering the delivery of application metrics from Spring Boot applications. Micrometer provides a simple facade over the instrumentation clients for the most popular monitoring systems, allowing you to instrument your JVM-based application code without vendor lock-in.

As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends.

Contains built-in support for AppOptics, Azure Monitor, Netflix Atlas, CloudWatch, Datadog, Dynatrace, Elastic, Ganglia, Graphite, Humio, Influx/Telegraf, JMX, KairosDB, New Relic, Prometheus, SignalFx, Google Stackdriver, StatsD, and Wavefront.

Micrometer Concepts

Let us review key concepts around Micrometer.
  • Meter Meter is the interface for collecting a set of measurements (which we individually call metrics) about the application
  • MeterRegistry Meters in Micrometer are created from and held in a MeterRegistry. Each supported monitoring system has an implementation of MeterRegistry.
  • SimpleMeterRegistry Micrometer includes a SimpleMeterRegistry that holds the latest value of each meter in memory and does not export the data anywhere. A SimpleMeterRegistry is autowired in a Spring Boot application.
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.counter("books");
    
  • CompositeMeterRegistry Micrometer provides a CompositeMeterRegistry to which you can add multiple registries, letting you publish metrics to more than one monitoring system simultaneously.
    CompositeMeterRegistry composite = new CompositeMeterRegistry();
    SimpleMeterRegistry simple = new SimpleMeterRegistry();
    composite.add(simple); 
    
  • Meter primitives Micrometer supports a set of Meter primitives, including Timer, Counter, Gauge, DistributionSummary, LongTaskTimer, FunctionCounter, FunctionTimer, and TimeGauge. Different meter types result in a different number of time series metrics.
  • Tags A meter is uniquely identified by its name and dimensions/ tags. Dimensions/ Tags let a particular named metric be sliced to drill down and reason about the data. In the example below, if we select database.calls, we can see the total number of calls to all databases. Then we can group by or select by db to drill down further or perform comparative analysis on the contribution of calls to each database.
    registry.counter("database.calls", "db", "users")
    registry.counter("http.requests", "uri", "/api/users")
    
  • @Timed annotation The micrometer-core module contains a @Timed annotation that frameworks can use to add timing support to either specific types of methods such as those serving web request endpoints or, more generally, to all methods. In the example below, we have added @Timed annotation to the API method which exposes timing metrics on the endpoint.
    @PostMapping("/movies")
    @Timed("movies.api")
    public String orderMovie() {
       return bookService.orderMovie();
    }
    
  • Supported Metrics and Meters Spring Boot provides automatic meter registration for a wide variety of technologies. In most situations, the defaults provide sensible metrics that can be published to any of the supported monitoring systems. Few examples:
    • JVM Metrics Auto-configuration enables JVM Metrics by using core Micrometer classes. JVM metrics are published under the jvm. meter name. E.g. jvm.threads.live, jvm.memory.max
    • System metrics Auto-configuration enables system metrics by using core Micrometer classes. System metrics are published under the system., process., and disk. meter names. E.g system.cpu.usage
    • Application Startup Metrics Auto-configuration exposes application startup time metrics. E.g. application.started.time, application.ready.time
    Refer to this link for all supported metrics.

Introduction to Prometheus

Reference: Prometheus
Let us review key concepts around Prometheus.
  • Prometheus is an open-source systems monitoring and alerting toolkit.
  • Prometheus collects and stores its metrics as time series data, i.e. metrics information is stored with the timestamp at which it was recorded, alongside optional key-value pairs called labels.
  • In Prometheus, time series collection happens via a pull model over HTTP.
  • Supports PromQL, a flexible query language
  • Prometheus has an alertmanager to handle alerts
  • Prometheus scrapes metrics from instrumented jobs, either directly or via an intermediary push gateway for short-lived jobs. It stores all scraped samples locally and runs rules over this data to either aggregate and record new time series from existing data or generate alerts. Grafana or other API consumers can be used to visualize the collected data.

Monitor Spring Boot App with Micrometer and Prometheus

Let us work on an example to make the concepts clear. In this example, we will do the following:
  • Implement a Spring Boot app and instrument micrometer and prometheus registry
  • Publish custom metrics from Spring Boot app to count orders placed
  • Use the @Timed annotation to find the API timing metrics
  • Use docker compose to run the demo app and prometheus as containers
  • Query the metrics from Prometheus and validate

Implement the Spring Boot app

Start with a Spring Boot starter application and include the actuator (spring-boot-starter-actuator) and micrometer prometheus registry (micrometer-registry-prometheus) dependencies.
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>	
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
	<dependency>
		<groupId>io.micrometer</groupId>
		<artifactId>micrometer-registry-prometheus</artifactId>
	</dependency>    
</dependencies>
Update the application.properties file to expose the endpoints. Pls note that all of these endpoints are enabled by default.
management.endpoints.web.exposure.include=health,metrics,prometheus,loggers
Next we will define the application and rest controller. We are exposing 2 APIs to order books and movies. Pls note the usage of @Timed annotation which we enabled to query the API timing metrics.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import io.micrometer.core.annotation.Timed;

@SpringBootApplication
@RestController
@Timed
public class DemoMonitoringAppApplication {

	@Autowired
	ItemService itemService;
	
	public static void main(String[] args) {
		SpringApplication.run(DemoMonitoringAppApplication.class, args);
	}

	@PostMapping("/books")
	@Timed("books.api")
	public String orderBook() {
		return itemService.orderBook();
	}
	
	@PostMapping("/movies")
	@Timed("movies.api")
	public String orderMovie() {
		return itemService.orderMovie();
	}
}
By default Spring Boot applications are Autowired with SimpleMeterRegistry. In this example, we define the configuration to create a CompositeMeterRegistry.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

@Configuration
public class DemoMonitorAppConfig {

	@Bean
	public MeterRegistry getMeterRegistry() {
		CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
		return meterRegistry;
	}
}
Finally in the component class we create 2 counter metrics to track the number of orders being placed.
package com.stackstalk;

import org.springframework.stereotype.Component;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

@Component
public class ItemService {

	private static int bookOrderId = 0;
	private static int movieOrderId = 0;
	private Counter bookCounter = null;
	private Counter movieCounter = null;
	
	public ItemService(CompositeMeterRegistry meterRegistry) {		
		bookCounter = meterRegistry.counter("order.books");
		movieCounter = meterRegistry.counter("order.movies");
	}
	
	public String orderBook() {		
		bookOrderId += 1;
		bookCounter.increment();
		return new String("Ordered Book with id = " + bookOrderId);
	}
	
	public String orderMovie() {		
		movieOrderId += 1;
		movieCounter.increment();
		return new String("Ordered Movie with id = " + movieOrderId);
	}
}
Get the full source code in GitHub.

Create Docker container for the Spring Boot app

Create a docker image for the demo application. Use a simple Dockerfile which package and run the application JAR.
FROM openjdk:17-alpine 
ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
To create the Docker image use:
docker build -t demoapp:1 -f Dockerfile .

Run the demoapp and Promethues as Docker containers

We will run demoapp and prometheus as docker containers. Sample prometheus.yml file to scrape the demoapp metrics.
global:
  scrape_interval:     15s

scrape_configs:
  - job_name: "demoapp_metrics"
    metrics_path: "/actuator/prometheus"
    static_configs:
      - targets: ["demoapp:8080"]
Create a docker-compose.yml and include the demoapp and prometheus as services. We also mount the prometheus.yml to the prometheus container.
services:
  demoapp:
    image: demoapp:1
    ports:
      - "8080:8080"
  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"
Start the services using the docker compose command:
docker-compose up

Query the metrics from Prometheus

Now let us query Prometheus to see the application metrics we have introduced. We see the counters for books and movies. Since we have introduced the @Timed annotation we also see the metrics around the APIs.
curl -X GET http://localhost:8080/actuator/prometheus
# HELP order_books_total  
# TYPE order_books_total counter
order_books_total 1.0
# HELP books_api_seconds_max  
# TYPE books_api_seconds_max gauge
books_api_seconds_max{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 0.0
# HELP books_api_seconds  
# TYPE books_api_seconds summary
books_api_seconds_count{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 1.0
books_api_seconds_sum{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 0.05435327

# HELP order_movies_total  
# TYPE order_movies_total counter
order_movies_total 1.0
# HELP movies_api_seconds  
# TYPE movies_api_seconds summary
movies_api_seconds_count{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 1.0
movies_api_seconds_sum{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 0.023019645
# HELP movies_api_seconds_max  
# TYPE movies_api_seconds_max gauge
movies_api_seconds_max{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 0.023019645
Another approach is to use the Promethus Dashboard at "http://localhost:9090/" to query and visualize the metrics.

Conclusion

Ability to monitor and manage aspects like health, metrics, logging etc. are key to run the applications in production. As discussed, leveraging Spring Actuator, Micrometer and Prometheus is a simple approach to Monitor Spring Boot applications. Micrometer provides a simple facade over the instrumentation clients for the most popular monitoring systems, allowing you to instrument your JVM-based application code without vendor lock-in. Refer to more Spring Boot articles in this collection.
Read More
  • Share This:  

Wednesday, March 2, 2022

Introduction to Kafka Streams in Java

 March 02, 2022     Java     No comments   

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. In this article, we will explore the concepts, architecture and examples on Kafka Streams in Java.

Kafka Streams

  • Kafka Streams is the easiest way to write real time applications and microservices.
  • Kafka Streams is a client library and makes it easy to do data processing and transformations within Kafka.
  • Kafka Streams allows to write standard Java and Scala applications without the need of creating any processing clusters.
  • Kafka Streams API is a library (JAR) and it becomes part of application. There is no special code to deploy and doesn't run on Kafka brokers.
  • Kafka Streams is deployment agnostic and can be deployed on containers, VMs, Cloud, On-Prem etc.
  • Kafka Streams API supports per record (no batching) stream processing with millisecond latency.
  • Kafka Streams is fully aligned with Kafka security mechanism.

Kafka Streams Terminology

Key terminology around Kafka Streams.
  • Stream Stream is an unbounded, continuous realtime flow of records
  • Records Records are key value pairs
  • Topology Topology defines the stream processing computational logic of an application. It defines how input data is transformed to output data.
  • Stream Processor Stream processor is a node in the processor topology and transforms incoming streams, record by record. It may create a output stream.
  • Source Processor Source processor is a special type of stream processor that does not have any upstream processors. It takes data directly from one or more Kafka topics and forward to downstream processors. It doesn't transform data.
  • Sink Processor Sink processor is a special type of stream processor and doesn't have any children and sends the data directly to Kafka topics

Kafka Streams Example - Game Scoring App

In this example, we will use Kafka Streams to aggregate in real time the gaming scores pushed over in a Kafka topic. We have an input topic where the raw scores are published, which gets processed by the streams app to generate aggregate scores on an output topic.

Kafka Streams Setup

Start the Zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
Start the Kafka server.
bin/kafka-server-start.sh config/server.properties
Create the input and output Kafka topics.
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-input --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-output --bootstrap-server localhost:9092
Validate the Kafka topics being created.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
scores-input
scores-output
Start a Kafka console consumer to see the results.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     
                              --topic scores-output     
                              --from-beginning     
                              --formatter kafka.tools.DefaultMessageFormatter     
                              --property print.key=true     
                              --property print.value=true     
                              --skip-message-on-error     
                              --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer     
                              --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Start a Kafka console producer to input the scores. We are using ":" as the seperator between key and value.
bin/kafka-console-producer.sh --broker-list localhost:9092 
                              --topic scores-input 
                              --property "parse.key=true" 
                              --property "key.separator=:"

Game Scoring App

This is the high flow of the application code.
  • Create a properties object to hold the configuration of Kafka Streams instance
  • Create a KStream to read the input topic, "scores-input" which will receive the input records
  • Do an aggregation on the KStream to get KTable. Here we group by key and add the new value. Aggregate the values of records in this stream by the grouped key.
  • Materialize the stream to a output topic
  • Create and start the Kafka Streams instance
package com.stackstalk.kafkastreams;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

public class GameScoringApp {

	public static void main(String[] args) {
		
		// Configuration for a Kafka Streams instance
		Properties config = new Properties();
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, "game-scoring");
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
		// Create a KStream to read  the specific topic
		StreamsBuilder builder = new StreamsBuilder();
		KStream<String, String> scoresInput = builder.stream("scores-input");
		
		// Do an aggregation on the KStream to get KTable
		// Here we group by key and add the new value
		// Aggregate the values of records in this stream by the grouped key. 
		// Records with null key or value are ignored. 
		// The result is written into a local KeyValueStore 
		KTable<String, Long> scoresOutput = scoresInput
				.groupByKey()
				.aggregate(() -> 0L, (key, value, total) -> total + Long.parseLong(value),
				Materialized.with(Serdes.String(), Serdes.Long()));
		
		// Materialize the stream to a output topic
		scoresOutput.toStream()
		.peek((key, value) -> System.out.println("Key:Value = " + key + ":" + value))
		.to("scores-output");
		
		// Print the topology
		Topology topology = builder.build();
		System.out.println(topology.describe());
		
		// Create the Kafka Streams instance
		KafkaStreams kafkaStreams = new KafkaStreams(topology, config);
		
		// Clean local state prior to starting the processing topology
		// In production we need to cleanup only on certain conditions
		kafkaStreams.cleanUp();
		
		// Start the KafkaStreams instance
		kafkaStreams.start();

		// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
		Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
	}
}

Kafka Streams Topology output

This is the topology output from this example. Observe the source, stream and sink processors.
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [scores-input-1])
      --> KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
      --> KTABLE-TOSTREAM-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
      --> KSTREAM-PEEK-0000000004
      <-- KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KTABLE-TOSTREAM-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: scores-output)
      <-- KSTREAM-PEEK-0000000004

Kafka Console Producer input

In the Kafka Console producer send the records (scores) as key:value (name:score) pairs.
>john:5
>tony:3
>john:2
>ram:4
>tony:1

Kafka Console Consumer output

In the Kafka Console consumer output observe the aggregation in action. Everytime a new record is pushed, based on the key the scores get aggregated.
john	5
tony	3
john	7
ram	4
tony	4
Get the full source code of the app in GitHub.

Exactly once Semantics

Kafka Streams implements the Exacly Once Semantics. Exactly once is the guarantee that data processing on each message will happen exactly once. Pushing back the messages on to Kafka will also happen once. The prodcuers are idempotent, if the same message is sent multiple times Kafka will ensure only one copy the message is kept. If a message needs to be written to multiple topics as part of a transaction, Kafka will ensure either all messages are written or none of them are written.

The exactly-once guarantees provided by Kafka’s Streams API are the strongest guarantees offered by any stream processing system so far. It offers end-to-end exactly-once guarantees for a stream processing application that extends from the data read from Kafka, any state materialized to Kafka by the Streams app, to the final output written back to Kafka.

Enabling exactly once in Kafka Streams is an additional property to be specified in the streams configuration. This provides the exactly once processing guarantee.
// Configuration for a Kafka Streams instance
Properties config = new Properties();
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

KStreams, KTables and GlobalKTable

The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges). We can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.

Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Kafka Streams DSL supports built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable.

KStream is an abstraction of a record stream. Using the table analogy, data records in a record stream are always interpreted as an “INSERT”. This is similar to a log, infinite, unbounded data streams.

KTable is an abstraction of a changelog stream, where each data record represents an update. The value in a data record is interpreted as an “UPDATE” of the last value for the same record key. Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update. Main difference between KTable and GlobalKTable. Let us assume a topic with 5 partitions.
  • If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

Stream Operations

We will review some of the stream operations with examples.

Map

Takes one record and produces one record, applicable only for KStreams. Affects both keys and values. In this example, we convert both key and value to lower case.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.map((key, value) -> KeyValue.pair(key.toLowerCase(), value.toLowerCase())); 	
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

MapValues

Takes one record and produces one record, applicable for KStreams and KTables. It affects only the values. In this example, we increment the value with a weight.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, Integer> mappedInput = scoresInput.mapValues(value -> Integer.parseInt(value) * 100); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.Integer()));

Filter/ FilterNot

Takes one record and produces zero or one record. FilterNot is the inverse of Filter. In this example, we filter the keys based on a regular expression.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");			
KStream<String, String> mappedInput = scoresInput.filter((key, value) -> key.matches("^[a-zA-Z].*")); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

FlatMap

Takes one record and produces zero, one or more records, applicable for KStreams. In this example, we split the key based on a seperator and create multiple KeyValue pairs.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMap((key, value) -> {
	List<KeyValue<String, String>> e = new ArrayList<>();
	String[] keyList = key.split(",");
	for ( String e1: keyList )
		e.add(KeyValue.pair(e1, value));
		return e;		
});

FlatMapValues

Takes one record and produces zero, one or more records, applicable for KStreams. Does not change the keys.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMapValues(value -> Arrays.asList(value.split(",")));

Branch

Split a KStream based on one or more predicates. Result is multiple KStreams. In this example, we split the stream into 3 stream based on the value of the score.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");

BranchedKStream<String, String> branches = scoresInput.split(Named.as("split-"))
	.branch((key, value) -> Integer.parseInt(value) > 100, Branched.as("hundred-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 50, Branched.as("fifty-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 10, Branched.as("ten-plus"));
		
branches.defaultBranch().get("split-hundred-plus").to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

SelectKey

Assigns a new key to the record. In this example, we prepend a tag to the key.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input-1");
KStream<String, String> mappedInput = scoresInput.selectKey((key, value) -> "MYTAG_" + key);
		
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
Refer to the Game Scoring App example above, for examples on stateful operations like GroupBy and Aggregate.

Conclusion

Kafka Streams is the easiest way to write real time applications and microservices in Java. With Kafka Streams DSL high-level API, the most common data transformation operations such as map, filter, join, and aggregations out of the box.
Read More
  • Share This:  

Friday, February 4, 2022

Getting started in GraphQL with Spring Boot

 February 04, 2022     Java, Microservices, Spring Boot     No comments   

In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support.

GraphQL Introduction

Reference: https://graphql.org/

GraphQL is a query language for the APIs, and a server-side runtime for executing queries using a type system defined for the data. GraphQL isn't tied to any specific database or storage engine and there are multiple libraries to help implement GraphQL in different languages.

In GraphQL, schema is used to describe the shape of available data. Schema defines a hierarchy of types with fields. The schema also specifies exactly which queries and mutations are available for clients to execute. We will look into details on schema in the next section.

Typically when using a REST API we will always get back a complete dataset. Referred as over-fetching we could not limit the fields REST API returns. In GraphQL, using the query language the request can be customized to what is needed down to specific fields within each entity. Once we limit the data the amount of processing needed is reduced and it starts to add up.

Some best practice recommendations for GraphQL services.
  • GraphQL is typically served over HTTP via a single endpoint which expresses the full set of capabilities of the service. This is in contrast to REST APIs which expose a suite of URLs each of which expose a single resource.
  • GraphQL services typically respond using JSON, however the GraphQL spec does not require it
  • While there's nothing that prevents a GraphQL service from being versioned just like any other REST API, GraphQL takes a strong opinion on avoiding versioning by providing the tools for the continuous evolution of a GraphQL schema.
  • The GraphQL type system allows for some fields to return lists of values, but leaves the pagination of longer lists of values up to the API designer.

GraphQL Schema Introduction

The GraphQL specification defines a human-readable schema definition language (or SDL) that can be used to define schema and store it as a string.

Let us start with an example and understand the concepts.
# Movie type has fields id, name and Director
type Movie {
  name: String!
  director: Director
}

# Director type has fields id, name and list of movies
type Director {
  name: String!
  movies: [Movie] # List of movies
}

type Query {
  movies: [Movie]
  directors: [Director]
}

type Mutation {
  addMovie(id: ID, name: String, director: String): Movie
}
In this example, we have two object types Movie and Director. Movie has the associated Director and the Director has the list of movies. Relationships are defined in a unified schema, which allows client developers to see what data is available and then request a specific subset of that data with a single optimized query.
  • Most of the types defined in a GraphQL schema are object types. An object type contains a collection of fields, each of which has its own type.
  • GraphQL's default scalar types are Int, String, Float, Boolean, ID. ID is typically a key for a cache
  • A field can return a list containing items of a particular type. This is indicated with [] square brackets
  • Non-nullability of a field can be specified with an exclamation mark !
  • The Query type is a special object type that defines all of the top-level entry points for queries that clients execute against the server
  • The Mutation type is similar to the Query type and defines entry points for write operations.
  • Input types are special object types that allow to provider data as arguments to fields as against flat scalar arguments
With this introduction let us implement an example.

MovieService example

In this example, we will implement a simple movie service to leverage GraphQL with Spring Boot. We will implement Query and Mutation support of GraphQL to list all movies, find a specific movie and to add a new movie. This example also integrates H2 database to store the movies.

GraphQL Schema

Let us now define the GraphQL schema which describes what data can be queried. Create a file "schema.graphqls" in the resources folder of the app and include the below schema.

In this schema we have "Movie" as object type. We define an input type "MovieInput" which helps in the mutation case to pass the object as whole. We also have "Query" and "Mutation" types to get movie(s) and add a movie.
type Movie {
   id: ID
   name: String
   director: String
}

input MovieInput {
   id: ID!
   name: String!
   director: String!
}

type Query {
   movies: [Movie]
   movieById(id: ID): Movie
}

type Mutation {
   addMovie(movieInput: MovieInput): Movie
}

Few additional points to note in the schema definition.
  • In the "movies" query we have not included any parenthesis since there are no arguments
  • The "!" for fields in MovieInput type indicates these are non-nullable and input is expected

Spring Boot dependencies

Create a Spring starter project and include the Spring Starter Web support, GraphQL Spring Boot starter, GraphQL Java, Starter for Spring Data JPA, H2 Database Engine, Project Lombak and GraphQL IDE for testing as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>			
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>        
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphql-spring-boot-starter</artifactId>
		<version>5.0.2</version>
	</dependency>		
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphql-java-tools</artifactId>
		<version>5.2.4</version>
	</dependency>		
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-jpa</artifactId>
	</dependency>
	<dependency>
		<groupId>com.h2database</groupId>
		<artifactId>h2</artifactId>
	</dependency>		
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphiql-spring-boot-starter</artifactId>
		<version>3.0.3</version>
	</dependency>	
</dependencies>

Configure application.properties

Update the application.properties to specify the H2 database.
spring.datasource.url=jdbc:h2:mem:moviesdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

Define the data object

We start with the Movie POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import javax.persistence.Entity;

import javax.persistence.Id;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Movie {

	@Id
	private Integer id;
	private String name;
	private String director;
}


Define the repository

Now let us define the repository.
package com.stackstalk;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MovieRepository extends JpaRepository<Movie, Integer> {

}

Define the GraphQL query resolver

Now let us implement the GraphQL query resolver which serves the queries from the client. The GraphQLQueryResolver is a marker interface and the framework looks for method signatures matching the GraphQL schema. It is also feasible to write multiple query resolvers with a logical grouping of queries. In this example, we implement the two query methods to "Get a movie by id" and "Get all movies" matching the schema.
package com.stackstalk;

import java.util.List;
import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.coxautodev.graphql.tools.GraphQLQueryResolver;

@Component
public class MovieQueryResolver implements GraphQLQueryResolver {
	
	@Autowired
	private MovieRepository movieRepository;
	
	public Optional<Movie> movieById(Integer id) {
		return movieRepository.findById(id);
	}
	
	public List<Movie> movies() {
		return movieRepository.findAll();
	}
}

Define the GraphQL mutation resolver

Now let us implement the GraphQL mutation resolver which serves the writes from the client. In this example, we implement the add movie method which matches the mutation type defined the schema.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.coxautodev.graphql.tools.GraphQLMutationResolver;

@Component
public class MovieMutationResolver implements GraphQLMutationResolver {
	
	@Autowired
	private MovieRepository movieRepository;
	
	public Movie addMovie(Movie input) {
		Movie movie = new Movie(input.getId(), input.getName(), input.getDirector());
		return movieRepository.save(movie);
	}
}


Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MovieServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(MovieServiceApplication.class, args);
	}
}

Putting it all together and testing

Now, let us test the application and examine.

Using the GraphQL IDE

In the project dependencies we have already included the GraphiQL dependency. This is an IDE which can be leveraged to perform GraphQL queries and do the mutuations. To launch the IDE use this URL http://localhost:8080/ in the browser.
If testing on Postman use the endpoint "http://localhost:8080/graphql" to do the GraphQL queries.

Add a Movie

This mutation is used to add a new movie.

GraphQL query
mutation {
  addMovie(movieInput: {
    id: 1
    name: "Scream"
    director: "Matt Bettinelli-Olpin, Tyler Gillett"
  }) {
    id
    name
    director
  }
}
Output
{
  "data": {
    "addMovie": {
      "id": "1",
      "name": "Scream",
      "director": "Matt Bettinelli-Olpin, Tyler Gillett"
    }
  }
}

Query a movie by identifier

This query is used to get a specific movie by identifier.

GraphQL query
query {
  movieById(id: 1) {
    id
    name
    director
  }
}
Output
{
  "data": {
    "movieById": {
      "id": "1",
      "name": "Scream",
      "director": "Matt Bettinelli-Olpin, Tyler Gillett"
    }
  }
}

Query all movies

This query is used to get a list of all movies.

GraphQL query
query {
  movies {
    id
    name
    director
  }
}
Output
{
  "data": {
    "movies": [
      {
        "id": "1",
        "name": "Scream",
        "director": "Matt Bettinelli-Olpin, Tyler Gillett"
      },
      {
        "id": "2",
        "name": "Spider-Man: No Way Home",
        "director": "Jon Watts "
      }
    ]
  }
}
Get access to the full source code of this example in GitHub.

In conclusion, typically when using a REST API we will always get back a complete dataset. Referred as over-fetching we could not limit the fields REST API returns. In GraphQL, using the query language the request can be customized to what is needed down to specific fields within each entity. With Spring Boot it is pretty easy to implement GraphQL with the available library support.

Refer to this link for more Java Tutorials.
Read More
  • Share This:  

Monday, January 24, 2022

Server-Sent Events with Spring WebFlux

 January 24, 2022     Java, Microservices     No comments   

In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it would be important to understand basic concepts of reactive programming from the following links.
  • Reactive Programming with Spring WebFlux
  • Spring WebFlux Example using Annotation based programming
  • Spring WebFlux Example using Functional Programming

Server-Sent Events Introduction

Server-Sent Events (SSE) enables a client to receive automatic updates from a server using a HTTP connection without requiring to poll. These are commonly used to send message updates or continuous data streams. In SSE, events flow from server to client and is one way communication channel. Examples of SSE includes publishing notifications to the browser client, updates like stock price, game scores to all subscribed clients etc.

LogHandler example

In this example, we will implement a simple log handler service using Spring WebFlux. We expose a "/stream" endpoint and demonstrate the working of Server-Sent Events (SSE) using Spring WebFlux.

Spring Boot dependencies

Create a Spring starter project and include the Spring Reactive Web support (WebFlux), Spring Data MongoDB Reactive, Project Lombak and test modules as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>		
</dependencies>

Configure application.properties

Update the application.properties to specify the Mongo database. We are using a local Mongo database on port 27017 and the database would be "logs".
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=logs

Define the data object

We start with the LogInfo POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
@Document
public class LogInfo {
	@Id 
	private String logId;
	private Long logTimestamp;
	private String logMessage;
}

Define the repository

Now let us define the repository. Note that here we use the Mongo specific repositiry with reactive support.
package com.stackstalk;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface LogInfoRepository extends ReactiveMongoRepository<LogInfo, String> {

}

Define the Handler Function

In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono<ServerResponse>).

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.

There are multiple Sinks categories:
  • many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure
  • many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
  • many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
  • one(): a sink that will play a single element to its subscribers
  • empty(): a sink that will play a terminal signal only to its subscribers (error or complete)

In this example, we will use a replay sink which will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Component
public class LogInfoHandler {

	@Autowired
	private LogInfoRepository logRepository;
	
	Sinks.Many<LogInfo> logInfoSink = Sinks.many().replay().all();
	
	public Mono<ServerResponse> addLogInfo(ServerRequest serverRequest) {
		return serverRequest.bodyToMono(LogInfo.class)
				.doOnNext(logInfo -> {
					logInfoSink.tryEmitNext(logInfo);
				})
				.flatMap(l -> {
					return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(logRepository.save(l), LogInfo.class);
				}).log();
	}
	
	public Mono<ServerResponse> getLogStream(ServerRequest serverRequest) {
		return ServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON)
				.body(logInfoSink.asFlux(), LogInfo.class).log();
	}
}

Define the Router Function

Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono<HandlerFunction>). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class LogInfoRouter {
	
	@Bean
	public RouterFunction<ServerResponse> routes(LogInfoHandler handler) {
		return RouterFunctions.route(RequestPredicates.POST("/logs"), handler::addLogInfo)
				.andRoute(RequestPredicates.GET("/stream"), handler::getLogStream);
	}
}

Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LogHandlerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LogHandlerApplication.class, args);
	}

}

Putting it all together and testing

Now, let us test the application and examine. On starting the application observe that Netty server is being started on port 8080. Also see the usage of reactive MongoDB repository.
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.6.2)[0;39m

[2m2022-01-24 15:37:19.428[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Starting LogHandlerApplication using Java 16.0.2 on FAKEUSER-M-F1VD with PID 30335 (/Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler/target/classes started by fakeuser in /Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler)
[2m2022-01-24 15:37:19.430[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m No active profile set, falling back to default profiles: default
[2m2022-01-24 15:37:19.821[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
[2m2022-01-24 15:37:19.949[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Finished Spring Data repository scanning in 124 ms. Found 1 Reactive MongoDB repository interfaces.
[2m2022-01-24 15:37:20.372[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'}
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
[2m2022-01-24 15:37:20.612[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=120133302}
[2m2022-01-24 15:37:20.961[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.b.web.embedded.netty.NettyWebServer [0;39m [2m:[0;39m Netty started on port 8080
[2m2022-01-24 15:37:20.968[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Started LogHandlerApplication in 1.871 seconds (JVM running for 2.632)

Open a HTTP connection on the /stream endpoint

This API is used to listen for Server-Sent Events (SSE).
curl --location --request GET 'http://localhost:8080/stream' 
The HTTP connection doesn't close and returns the log messages whenever added. Whenever a new message is added the event is pushed to the client. Open multiple connections and observe the replay behavior whenever a new subscription is made.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}

Add a log message with a POST on /logs

This API is used to add a new log message.
curl --location --request POST 'http://localhost:8080/logs' \
> --header 'Content-Type: application/json' \
> --data-raw '{
>     "logId": "1",
>     "logTimestamp": 1642649553432,
>     "logMessage": "Hello. This is my first message"
> }'
Returns the log message added.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}
In the logs observe usage of reactive signals with onSubscribe, request, onNext (once) and onComplete.

Conclusion

Implementing Server-Sent Events (SSE) on the server side with reactive stream signals with Spring WebFlux is very easy. Sinks provide flexibility and abstraction to easily handle multiple requirements at ease. Refer to this link for more Java Tutorials. Get access to the full source code of this example in GitHub.
Read More
  • Share This:  
Older Posts Home
Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Implement caching in a Spring Boot microservice using Redis
    In this article we will explore how to use Redis as a data cache for a Spring Boot microservice using PostgreSQL as the database. Idea is to...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Spring Boot with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Getting started with Kafka in Python
    This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example. What is Kafka? ...
  • Getting started in GraphQL with Spring Boot
    In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support. ...

Copyright © StackStalk