StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Friday, September 16, 2022

Accessing the Kubernetes API

 September 16, 2022     Kubernetes     No comments   

In this article, we will explore the steps required to access the Kubernetes API and overcome common challenges. All operations and communications between components, and external user commands are REST API calls that the Kubernetes API Server handles. Consequently, everything in the Kubernetes platform is treated as an API object. 

Reference: Kubernetes API



We will use docker desktop to demonstrate these steps.

Get Kubernetes API server address for the cluster

First step is to get the API server endpoint. Use "kubectl config view" and note down the server endpoint for the specific cluster. Here "https://kubernetes.docker.internal:6443" is the endpoint for Kubernetes API server.
$ kubectl config view
apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: DATA+OMITTED
    server: https://kubernetes.docker.internal:6443
  name: docker-desktop
contexts:
- context:
    cluster: docker-desktop
    user: docker-desktop
  name: docker-desktop
current-context: docker-desktop
kind: Config
preferences: {}
users:
- name: docker-desktop
  user:
    client-certificate-data: REDACTED
    client-key-data: REDACTED

Get the Kubernetes version error

Let us read the Kubernetes version via the API server. We will face the SSL certificate issue.
$ curl https://kubernetes.docker.internal:6443/version
curl: (60) SSL certificate problem: unable to get local issuer certificate
More details here: https://curl.se/docs/sslcerts.html

curl failed to verify the legitimacy of the server and therefore could not
establish a secure connection to it. To learn more about this situation and
how to fix it, please visit the web page mentioned above.

Get the Kubernetes version using SSL certificate

To overcome the SSL certificate problem send the cacert. Typically in docker desktop the ca.crt is available at ~/Library/Containers/com.docker.docker/pki/ca.crt.
$ curl https://kubernetes.docker.internal:6443/version --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt
{
  "major": "1",
  "minor": "25",
  "gitVersion": "v1.25.0",
  "gitCommit": "a866cbe2e5bbaa01cfd5e969aa3e033f3282a8a2",
  "gitTreeState": "clean",
  "buildDate": "2022-08-23T17:38:15Z",
  "goVersion": "go1.19",
  "compiler": "gc",
  "platform": "linux/arm64"
}

Read the POD list error

Now let us try to read the list of pods in default namespace. We will get an authorization error since we don't have permissions yet.
$ curl https://kubernetes.docker.internal:6443/api/v1/namespaces/default/pods --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {},
  "status": "Failure",
  "message": "pods is forbidden: User \"system:anonymous\" cannot list resource \"pods\" in API group \"\" in the namespace \"default\"",
  "reason": "Forbidden",
  "details": {
    "kind": "pods"
  },
  "code": 403
}

Read the POD list using service account token

To overcome the permissions issue, create a secret to hold a token for the default service account.
$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: default-token
  annotations:
    kubernetes.io/service-account.name: default
type: kubernetes.io/service-account-token
EOF
Provide the required RBAC authorization. In this example, we are providing cluster admin role to default service account. More details on RBAC here.
$ kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: default-rbac
subjects:
  - kind: ServiceAccount
    name: default
    namespace: default
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
EOF
Now read the default token secret.
$ kubectl describe secret default-token
Name:         default-token
Namespace:    default
Labels:       <none>
Annotations:  kubernetes.io/service-account.name: default
              kubernetes.io/service-account.uid: 8405ff0b-bc0f-425d-8980-7ae289563880

Type:  kubernetes.io/service-account-token

Data
====
ca.crt:     1099 bytes
namespace:  7 bytes
token:      [USE-THIS-TOKEN]
Use the token in the curl command header as the bearer token for authorization.
$ curl https://kubernetes.docker.internal:6443/api/v1/namespaces/default/pods --cacert ~/Library/Containers/com.docker.docker/pki/ca.crt -H "Authorization: Bearer [USE-THIS-TOKEN]"
We are now able to read the POD list without any errors.
Read More
  • Share This:  

Friday, September 2, 2022

Avro Producer and Consumer with Python using Confluent Kafka

 September 02, 2022     Python     No comments   

In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Producer and Consumer with Python using Confluent Kafka. Also refer this article for basic introduction on Getting started with Kafka in Python

Avro Introduction

For streaming data use cases, Avro is a popular data serialization format. Avro is an open source data serialization system that helps with data exchange between systems, programming languages, and processing frameworks. Avro helps define a binary format for your data, as well as map it to the programming language of choice.

Avro relies on schemas. When Avro data is read, the schema is used and when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

Avro has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data.

Let us look at an example to represent the movie data below to Avro schema.

{
  "imdbID": "tt0061722",
  "Title": "The Graduate",
  "Year": 1967,
  "Director": "Mike Nichols",
  "Actors": [
    "Anne Bancroft",
    "Dustin Hoffman",
    "Katharine Ross",
    "William Daniels"
  ],
  "Language": "English",
  "imdbRating": 8.1
}

Movie Avro Schema

Avro schema definitions are JSON records. Because it is a record, it can define multiple fields which are organized in a JSON array. Each such field identifies the field's name as well as its type. The type can be something simple, like an integer, or something complex, like another record.

In the movie schema we have defined the MovieDetails record type with associated fields to represent the movie data.

{
    "type": "record",
    "namespace": "com.stackstalk",
    "name": "MovieDetails",
    "fields": [
        {
            "name": "imdbID",
            "type": "string"
        },
        {
            "name": "Title",
            "type": "string"
        },
        {
            "name": "Year",
            "type": "int"
        },
        {
            "name": "Language",
            "type": "string"
        },
        {
            "name": "Director",
            "type": "string"
        },
        {
            "name": "Actors",
            "type": {
              "type": "array",
              "items": {
                 "type": "string"
              }
            }
        },
        {
            "name": "imdbRating",
            "type": "float"
        }
    ]
}

Confluent Platform

Reference: Confluent Platform

Confluent Platform is an enterprise grade distribution of Apache Kafka. It is a full-scale data streaming platform that enables you to easily access, store, and manage data as continuous, real-time streams. Each release of Confluent Platform includes the latest release of Kafka and additional tools and services that make it easier to build and manage an Event Streaming Platform.

For this example we will leverage confluent services zookeeper, broker, schema registry and control center as docker containers. Refer Confluent docker compose specs

Confluent Schema Registry enables safe, zero downtime evolution of schemas by centralizing the schema management. It provides a RESTful interface for storing and retrieving Avro, JSON Schema, and Protobuf schemas. Schema Registry tracks all versions of schemas used for every topic in Kafka and only allows evolution of schemas according to user-defined compatibility settings. This gives developers confidence that they can safely modify schemas as necessary without worrying that doing so will break a different service they may not even be aware of. Support for schemas is a foundational component of a data governance solution.

Confluent Control Center is a GUI-based system for managing and monitoring Kafka. It allows you to easily manage Kafka Connect, to create, edit, and manage connections to other systems. It also allows you to monitor data streams from producer to consumer, assuring that every message is delivered, and measuring how long it takes to deliver messages.

Python producer and consumer example using Confluent Kafka

Now let us write a simple producer and consumer to demonstrate the concepts discussed above. It is pretty simple with confluent kafka.

Let us focus on the producer first. This is an example of synchronous producer. Here we connect to the Kafka server on 9092 port and use the produce call to write a message to Kafka topic. We load the defined key and value Avro schema files and associate them during the Avro producer creation. We use the flush call to make the writes synchronous and to ensure the messages are delivered before the producer is shutdown.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
    else:
        print("Message produced: %s" % msg.value().decode('utf-8'))

def load_avro_schema_from_file():
    key_schema = avro.load("avro/movie-topic-key.avsc")
    value_schema = avro.load("avro/movie-topic-value.avsc")

    return key_schema, value_schema

def send_data():
    producer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081"
    }

    key_schema, value_schema = load_avro_schema_from_file()

    #print(key_schema)
    #print(value_schema)

    try:
        producer = AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)

        f1 = open("data/movie-metadata.json", "r")
        key_str = f1.read();
        f1.close()

        f2 = open("data/movie-details.json", "r")
        value_str = f2.read()
        f2.close()

        producer.produce(topic = "movie-topic", key = json.loads(key_str), headers = [("my-header1", "Value1")], value = json.loads(value_str))
        producer.flush()

    except KafkaException as e:
        print('Kafka failure ' + e)

def main():
    send_data()

if __name__ == "__main__":
    main()

Now let us write a simple consumer. We specify the Kafka server, schema registry and the consumer group as the configuration to create the Avro consumer. We subscribe to the topic of interest before going into a consume loop. In the consume loop we call the poll() method to retrieve the messages from the topic. If there are no messages then poll() method return None.

#!/usr/bin/env python3

import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro

def read_data():
    consumer_config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": "http://localhost:8081",
        "group.id": "my-connsumer1",
        "auto.offset.reset": "earliest"
    }

    #print(key_schema)
    #print(value_schema)

    consumer = AvroConsumer(consumer_config)
    consumer.subscribe(['movie-topic'])

    while True:
      try:
        msg = consumer.poll(1)

        if msg is None:
          continue

        print("Key is :" + json.dumps(msg.key()))
        print("Value is :" + json.dumps(msg.value()))
        print("-------------------------")

      except KafkaException as e:
        print('Kafka failure ' + e)

    consumer.close()

def main():
    read_data()

if __name__ == "__main__":
    main()

Run the producer code.

$ python3 producer.py

When you run the producer application we should see the messages being received by the consumer.

$ python3 consumer.py 
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------
Key is :{"imdbID": "tt0061722", "Title": "The Graduate"}
Value is :{"imdbID": "tt0061722", "Title": "The Graduate", "Year": 1967, "Language": "English", "Director": "Mike Nichols", "Actors": ["Anne Bancroft", "Dustin Hoffman", "Katharine Ross", "William Daniels"], "imdbRating": 8.100000381469727}
-------------------------

Open the confluent control center http://localhost:9021/ to view the defined schema registered on movie topic.


Get access to the source code of this example in GitHub.

Read More
  • Share This:  

Monday, March 14, 2022

Monitor Spring Boot App with Micrometer and Prometheus

 March 14, 2022     Java, Spring Boot     No comments   

Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, metrics, logging etc. are key to run the applications in production. In this article, we will explore the concepts around Spring Actuator, Micrometer and Prometheus to Monitor Spring Boot applications.

Spring Boot Actuator

Reference: Spring Production-ready Features

Spring Boot includes a number of features to help monitor and manage applications in production via HTTP endpoints or JMX. Auditing, health, and metrics gathering can be automatically applied to applications.

The spring-boot-actuator module provides all of Spring Boot’s production-ready features. The recommended way to enable the features is to add a dependency on the spring-boot-starter-actuator “Starter”.

To add the actuator to a Maven-based project, add the following ‘Starter’ dependency to pom.xml.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

Actuator endpoints let you monitor and interact with your application. Spring Boot includes a number of built-in endpoints. For example, the health endpoint provides basic application health information. Most applications choose exposure over HTTP, where the ID of the endpoint and a prefix of /actuator is mapped to a URL. For example, by default, the health endpoint is mapped to /actuator/health.

An endpoint is considered to be available when it is both enabled and exposed. To configure the enablement of an endpoint, use its management.endpoint.<id>.enabled property in application.properties. By default, all endpoints except for shutdown are enabled.

management.endpoint.info.enabled=true

Since Endpoints may contain sensitive information, you should carefully consider when to expose them. For example, to stop exposing all endpoints over HTTP and only expose the health, metrics and loggers endpoints, use the following property in application.properties:

management.endpoints.web.exposure.include=health,metrics,loggers
Let us look at few examples to understand the Spring Boot actuator.

Configure log level of Spring Boot app at runtime

To change the log level of a Spring Boot application during run time, we can use the endpoint "/actuator/loggers". This is useful when you want to collect more detailed logs to debug production issues for a specific period of time.

To change the log level of root logger:

curl --location --request POST 'http://localhost:8080/actuator/loggers/ROOT' \
--header 'Content-Type: application/json' \
--data-raw '{
    "configuredLevel": "DEBUG"
}'

To view the list of all loggers and current configuration in use:

curl -X GET http://localhost:8080/actuator/loggers

Query the Spring Boot app metrics

To query the metrics of application use the endpoint "/actuator/metrics". For example, to get the process cpu usage:
curl -X GET http://localhost:8080/actuator/metrics/process.cpu.usage
Returns the process.cpu.usage metric.
{
    "name": "process.cpu.usage",
    "description": "The \"recent cpu usage\" for the Java Virtual Machine process",
    "baseUnit": null,
    "measurements": [
        {
            "statistic": "VALUE",
            "value": 0.0
        }
    ],
    "availableTags": []
}
To see list of all available metrics use:
curl -X GET http://localhost:8080/actuator/metrics

Query the Spring Boot app health information

To query the health status of application use the endpoint "/actuator/health".
curl -X GET http://localhost:8080/actuator/health
Returns the status of the application.
{"status":"UP"}

Introduction to Micrometer

Reference: Micrometer

Micrometer is a metrics instrumentation library powering the delivery of application metrics from Spring Boot applications. Micrometer provides a simple facade over the instrumentation clients for the most popular monitoring systems, allowing you to instrument your JVM-based application code without vendor lock-in.

As an instrumentation facade, Micrometer allows you to instrument your code with dimensional metrics with a vendor-neutral interface and decide on the monitoring system as a last step. Instrumenting your core library code with Micrometer allows the libraries to be included in applications that ship metrics to different backends.

Contains built-in support for AppOptics, Azure Monitor, Netflix Atlas, CloudWatch, Datadog, Dynatrace, Elastic, Ganglia, Graphite, Humio, Influx/Telegraf, JMX, KairosDB, New Relic, Prometheus, SignalFx, Google Stackdriver, StatsD, and Wavefront.

Micrometer Concepts

Let us review key concepts around Micrometer.
  • Meter Meter is the interface for collecting a set of measurements (which we individually call metrics) about the application
  • MeterRegistry Meters in Micrometer are created from and held in a MeterRegistry. Each supported monitoring system has an implementation of MeterRegistry.
  • SimpleMeterRegistry Micrometer includes a SimpleMeterRegistry that holds the latest value of each meter in memory and does not export the data anywhere. A SimpleMeterRegistry is autowired in a Spring Boot application.
    MeterRegistry registry = new SimpleMeterRegistry();
    registry.counter("books");
    
  • CompositeMeterRegistry Micrometer provides a CompositeMeterRegistry to which you can add multiple registries, letting you publish metrics to more than one monitoring system simultaneously.
    CompositeMeterRegistry composite = new CompositeMeterRegistry();
    SimpleMeterRegistry simple = new SimpleMeterRegistry();
    composite.add(simple); 
    
  • Meter primitives Micrometer supports a set of Meter primitives, including Timer, Counter, Gauge, DistributionSummary, LongTaskTimer, FunctionCounter, FunctionTimer, and TimeGauge. Different meter types result in a different number of time series metrics.
  • Tags A meter is uniquely identified by its name and dimensions/ tags. Dimensions/ Tags let a particular named metric be sliced to drill down and reason about the data. In the example below, if we select database.calls, we can see the total number of calls to all databases. Then we can group by or select by db to drill down further or perform comparative analysis on the contribution of calls to each database.
    registry.counter("database.calls", "db", "users")
    registry.counter("http.requests", "uri", "/api/users")
    
  • @Timed annotation The micrometer-core module contains a @Timed annotation that frameworks can use to add timing support to either specific types of methods such as those serving web request endpoints or, more generally, to all methods. In the example below, we have added @Timed annotation to the API method which exposes timing metrics on the endpoint.
    @PostMapping("/movies")
    @Timed("movies.api")
    public String orderMovie() {
       return bookService.orderMovie();
    }
    
  • Supported Metrics and Meters Spring Boot provides automatic meter registration for a wide variety of technologies. In most situations, the defaults provide sensible metrics that can be published to any of the supported monitoring systems. Few examples:
    • JVM Metrics Auto-configuration enables JVM Metrics by using core Micrometer classes. JVM metrics are published under the jvm. meter name. E.g. jvm.threads.live, jvm.memory.max
    • System metrics Auto-configuration enables system metrics by using core Micrometer classes. System metrics are published under the system., process., and disk. meter names. E.g system.cpu.usage
    • Application Startup Metrics Auto-configuration exposes application startup time metrics. E.g. application.started.time, application.ready.time
    Refer to this link for all supported metrics.

Introduction to Prometheus

Reference: Prometheus
Let us review key concepts around Prometheus.
  • Prometheus is an open-source systems monitoring and alerting toolkit.
  • Prometheus collects and stores its metrics as time series data, i.e. metrics information is stored with the timestamp at which it was recorded, alongside optional key-value pairs called labels.
  • In Prometheus, time series collection happens via a pull model over HTTP.
  • Supports PromQL, a flexible query language
  • Prometheus has an alertmanager to handle alerts
  • Prometheus scrapes metrics from instrumented jobs, either directly or via an intermediary push gateway for short-lived jobs. It stores all scraped samples locally and runs rules over this data to either aggregate and record new time series from existing data or generate alerts. Grafana or other API consumers can be used to visualize the collected data.

Monitor Spring Boot App with Micrometer and Prometheus

Let us work on an example to make the concepts clear. In this example, we will do the following:
  • Implement a Spring Boot app and instrument micrometer and prometheus registry
  • Publish custom metrics from Spring Boot app to count orders placed
  • Use the @Timed annotation to find the API timing metrics
  • Use docker compose to run the demo app and prometheus as containers
  • Query the metrics from Prometheus and validate

Implement the Spring Boot app

Start with a Spring Boot starter application and include the actuator (spring-boot-starter-actuator) and micrometer prometheus registry (micrometer-registry-prometheus) dependencies.
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>	
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
	<dependency>
		<groupId>io.micrometer</groupId>
		<artifactId>micrometer-registry-prometheus</artifactId>
	</dependency>    
</dependencies>
Update the application.properties file to expose the endpoints. Pls note that all of these endpoints are enabled by default.
management.endpoints.web.exposure.include=health,metrics,prometheus,loggers
Next we will define the application and rest controller. We are exposing 2 APIs to order books and movies. Pls note the usage of @Timed annotation which we enabled to query the API timing metrics.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import io.micrometer.core.annotation.Timed;

@SpringBootApplication
@RestController
@Timed
public class DemoMonitoringAppApplication {

	@Autowired
	ItemService itemService;
	
	public static void main(String[] args) {
		SpringApplication.run(DemoMonitoringAppApplication.class, args);
	}

	@PostMapping("/books")
	@Timed("books.api")
	public String orderBook() {
		return itemService.orderBook();
	}
	
	@PostMapping("/movies")
	@Timed("movies.api")
	public String orderMovie() {
		return itemService.orderMovie();
	}
}
By default Spring Boot applications are Autowired with SimpleMeterRegistry. In this example, we define the configuration to create a CompositeMeterRegistry.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

@Configuration
public class DemoMonitorAppConfig {

	@Bean
	public MeterRegistry getMeterRegistry() {
		CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
		return meterRegistry;
	}
}
Finally in the component class we create 2 counter metrics to track the number of orders being placed.
package com.stackstalk;

import org.springframework.stereotype.Component;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

@Component
public class ItemService {

	private static int bookOrderId = 0;
	private static int movieOrderId = 0;
	private Counter bookCounter = null;
	private Counter movieCounter = null;
	
	public ItemService(CompositeMeterRegistry meterRegistry) {		
		bookCounter = meterRegistry.counter("order.books");
		movieCounter = meterRegistry.counter("order.movies");
	}
	
	public String orderBook() {		
		bookOrderId += 1;
		bookCounter.increment();
		return new String("Ordered Book with id = " + bookOrderId);
	}
	
	public String orderMovie() {		
		movieOrderId += 1;
		movieCounter.increment();
		return new String("Ordered Movie with id = " + movieOrderId);
	}
}
Get the full source code in GitHub.

Create Docker container for the Spring Boot app

Create a docker image for the demo application. Use a simple Dockerfile which package and run the application JAR.
FROM openjdk:17-alpine 
ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
To create the Docker image use:
docker build -t demoapp:1 -f Dockerfile .

Run the demoapp and Promethues as Docker containers

We will run demoapp and prometheus as docker containers. Sample prometheus.yml file to scrape the demoapp metrics.
global:
  scrape_interval:     15s

scrape_configs:
  - job_name: "demoapp_metrics"
    metrics_path: "/actuator/prometheus"
    static_configs:
      - targets: ["demoapp:8080"]
Create a docker-compose.yml and include the demoapp and prometheus as services. We also mount the prometheus.yml to the prometheus container.
services:
  demoapp:
    image: demoapp:1
    ports:
      - "8080:8080"
  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"
Start the services using the docker compose command:
docker-compose up

Query the metrics from Prometheus

Now let us query Prometheus to see the application metrics we have introduced. We see the counters for books and movies. Since we have introduced the @Timed annotation we also see the metrics around the APIs.
curl -X GET http://localhost:8080/actuator/prometheus
# HELP order_books_total  
# TYPE order_books_total counter
order_books_total 1.0
# HELP books_api_seconds_max  
# TYPE books_api_seconds_max gauge
books_api_seconds_max{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 0.0
# HELP books_api_seconds  
# TYPE books_api_seconds summary
books_api_seconds_count{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 1.0
books_api_seconds_sum{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/books",} 0.05435327

# HELP order_movies_total  
# TYPE order_movies_total counter
order_movies_total 1.0
# HELP movies_api_seconds  
# TYPE movies_api_seconds summary
movies_api_seconds_count{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 1.0
movies_api_seconds_sum{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 0.023019645
# HELP movies_api_seconds_max  
# TYPE movies_api_seconds_max gauge
movies_api_seconds_max{exception="None",method="POST",outcome="SUCCESS",status="200",uri="/movies",} 0.023019645
Another approach is to use the Promethus Dashboard at "http://localhost:9090/" to query and visualize the metrics.

Conclusion

Ability to monitor and manage aspects like health, metrics, logging etc. are key to run the applications in production. As discussed, leveraging Spring Actuator, Micrometer and Prometheus is a simple approach to Monitor Spring Boot applications. Micrometer provides a simple facade over the instrumentation clients for the most popular monitoring systems, allowing you to instrument your JVM-based application code without vendor lock-in. Refer to more Spring Boot articles in this collection.
Read More
  • Share This:  

Wednesday, March 2, 2022

Introduction to Kafka Streams in Java

 March 02, 2022     Java     No comments   

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. In this article, we will explore the concepts, architecture and examples on Kafka Streams in Java.

Kafka Streams

  • Kafka Streams is the easiest way to write real time applications and microservices.
  • Kafka Streams is a client library and makes it easy to do data processing and transformations within Kafka.
  • Kafka Streams allows to write standard Java and Scala applications without the need of creating any processing clusters.
  • Kafka Streams API is a library (JAR) and it becomes part of application. There is no special code to deploy and doesn't run on Kafka brokers.
  • Kafka Streams is deployment agnostic and can be deployed on containers, VMs, Cloud, On-Prem etc.
  • Kafka Streams API supports per record (no batching) stream processing with millisecond latency.
  • Kafka Streams is fully aligned with Kafka security mechanism.

Kafka Streams Terminology

Key terminology around Kafka Streams.
  • Stream Stream is an unbounded, continuous realtime flow of records
  • Records Records are key value pairs
  • Topology Topology defines the stream processing computational logic of an application. It defines how input data is transformed to output data.
  • Stream Processor Stream processor is a node in the processor topology and transforms incoming streams, record by record. It may create a output stream.
  • Source Processor Source processor is a special type of stream processor that does not have any upstream processors. It takes data directly from one or more Kafka topics and forward to downstream processors. It doesn't transform data.
  • Sink Processor Sink processor is a special type of stream processor and doesn't have any children and sends the data directly to Kafka topics

Kafka Streams Example - Game Scoring App

In this example, we will use Kafka Streams to aggregate in real time the gaming scores pushed over in a Kafka topic. We have an input topic where the raw scores are published, which gets processed by the streams app to generate aggregate scores on an output topic.

Kafka Streams Setup

Start the Zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
Start the Kafka server.
bin/kafka-server-start.sh config/server.properties
Create the input and output Kafka topics.
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-input --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic scores-output --bootstrap-server localhost:9092
Validate the Kafka topics being created.
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
scores-input
scores-output
Start a Kafka console consumer to see the results.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     
                              --topic scores-output     
                              --from-beginning     
                              --formatter kafka.tools.DefaultMessageFormatter     
                              --property print.key=true     
                              --property print.value=true     
                              --skip-message-on-error     
                              --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer     
                              --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Start a Kafka console producer to input the scores. We are using ":" as the seperator between key and value.
bin/kafka-console-producer.sh --broker-list localhost:9092 
                              --topic scores-input 
                              --property "parse.key=true" 
                              --property "key.separator=:"

Game Scoring App

This is the high flow of the application code.
  • Create a properties object to hold the configuration of Kafka Streams instance
  • Create a KStream to read the input topic, "scores-input" which will receive the input records
  • Do an aggregation on the KStream to get KTable. Here we group by key and add the new value. Aggregate the values of records in this stream by the grouped key.
  • Materialize the stream to a output topic
  • Create and start the Kafka Streams instance
package com.stackstalk.kafkastreams;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

public class GameScoringApp {

	public static void main(String[] args) {
		
		// Configuration for a Kafka Streams instance
		Properties config = new Properties();
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, "game-scoring");
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
		// Create a KStream to read  the specific topic
		StreamsBuilder builder = new StreamsBuilder();
		KStream<String, String> scoresInput = builder.stream("scores-input");
		
		// Do an aggregation on the KStream to get KTable
		// Here we group by key and add the new value
		// Aggregate the values of records in this stream by the grouped key. 
		// Records with null key or value are ignored. 
		// The result is written into a local KeyValueStore 
		KTable<String, Long> scoresOutput = scoresInput
				.groupByKey()
				.aggregate(() -> 0L, (key, value, total) -> total + Long.parseLong(value),
				Materialized.with(Serdes.String(), Serdes.Long()));
		
		// Materialize the stream to a output topic
		scoresOutput.toStream()
		.peek((key, value) -> System.out.println("Key:Value = " + key + ":" + value))
		.to("scores-output");
		
		// Print the topology
		Topology topology = builder.build();
		System.out.println(topology.describe());
		
		// Create the Kafka Streams instance
		KafkaStreams kafkaStreams = new KafkaStreams(topology, config);
		
		// Clean local state prior to starting the processing topology
		// In production we need to cleanup only on certain conditions
		kafkaStreams.cleanUp();
		
		// Start the KafkaStreams instance
		kafkaStreams.start();

		// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
		Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
	}
}

Kafka Streams Topology output

This is the topology output from this example. Observe the source, stream and sink processors.
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [scores-input-1])
      --> KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
      --> KTABLE-TOSTREAM-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
      --> KSTREAM-PEEK-0000000004
      <-- KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KTABLE-TOSTREAM-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: scores-output)
      <-- KSTREAM-PEEK-0000000004

Kafka Console Producer input

In the Kafka Console producer send the records (scores) as key:value (name:score) pairs.
>john:5
>tony:3
>john:2
>ram:4
>tony:1

Kafka Console Consumer output

In the Kafka Console consumer output observe the aggregation in action. Everytime a new record is pushed, based on the key the scores get aggregated.
john	5
tony	3
john	7
ram	4
tony	4
Get the full source code of the app in GitHub.

Exactly once Semantics

Kafka Streams implements the Exacly Once Semantics. Exactly once is the guarantee that data processing on each message will happen exactly once. Pushing back the messages on to Kafka will also happen once. The prodcuers are idempotent, if the same message is sent multiple times Kafka will ensure only one copy the message is kept. If a message needs to be written to multiple topics as part of a transaction, Kafka will ensure either all messages are written or none of them are written.

The exactly-once guarantees provided by Kafka’s Streams API are the strongest guarantees offered by any stream processing system so far. It offers end-to-end exactly-once guarantees for a stream processing application that extends from the data read from Kafka, any state materialized to Kafka by the Streams app, to the final output written back to Kafka.

Enabling exactly once in Kafka Streams is an additional property to be specified in the streams configuration. This provides the exactly once processing guarantee.
// Configuration for a Kafka Streams instance
Properties config = new Properties();
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

KStreams, KTables and GlobalKTable

The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges). We can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs.

Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Kafka Streams DSL supports built-in abstractions for streams and tables in the form of KStream, KTable, and GlobalKTable.

KStream is an abstraction of a record stream. Using the table analogy, data records in a record stream are always interpreted as an “INSERT”. This is similar to a log, infinite, unbounded data streams.

KTable is an abstraction of a changelog stream, where each data record represents an update. The value in a data record is interpreted as an “UPDATE” of the last value for the same record key. Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE” or tombstone for the record’s key.

Like a KTable, a GlobalKTable is an abstraction of a changelog stream, where each data record represents an update. Main difference between KTable and GlobalKTable. Let us assume a topic with 5 partitions.
  • If you read the input topic into a KTable, then the “local” KTable instance of each application instance will be populated with data from only 1 partition of the topic’s 5 partitions.
  • If you read the input topic into a GlobalKTable, then the local GlobalKTable instance of each application instance will be populated with data from all partitions of the topic.

Stream Operations

We will review some of the stream operations with examples.

Map

Takes one record and produces one record, applicable only for KStreams. Affects both keys and values. In this example, we convert both key and value to lower case.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.map((key, value) -> KeyValue.pair(key.toLowerCase(), value.toLowerCase())); 	
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

MapValues

Takes one record and produces one record, applicable for KStreams and KTables. It affects only the values. In this example, we increment the value with a weight.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, Integer> mappedInput = scoresInput.mapValues(value -> Integer.parseInt(value) * 100); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.Integer()));

Filter/ FilterNot

Takes one record and produces zero or one record. FilterNot is the inverse of Filter. In this example, we filter the keys based on a regular expression.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");			
KStream<String, String> mappedInput = scoresInput.filter((key, value) -> key.matches("^[a-zA-Z].*")); 				
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

FlatMap

Takes one record and produces zero, one or more records, applicable for KStreams. In this example, we split the key based on a seperator and create multiple KeyValue pairs.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMap((key, value) -> {
	List<KeyValue<String, String>> e = new ArrayList<>();
	String[] keyList = key.split(",");
	for ( String e1: keyList )
		e.add(KeyValue.pair(e1, value));
		return e;		
});

FlatMapValues

Takes one record and produces zero, one or more records, applicable for KStreams. Does not change the keys.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");
KStream<String, String> mappedInput = scoresInput.flatMapValues(value -> Arrays.asList(value.split(",")));

Branch

Split a KStream based on one or more predicates. Result is multiple KStreams. In this example, we split the stream into 3 stream based on the value of the score.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input");

BranchedKStream<String, String> branches = scoresInput.split(Named.as("split-"))
	.branch((key, value) -> Integer.parseInt(value) > 100, Branched.as("hundred-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 50, Branched.as("fifty-plus"))
	.branch((key, value) -> Integer.parseInt(value) > 10, Branched.as("ten-plus"));
		
branches.defaultBranch().get("split-hundred-plus").to("scores-output", Produced.with(Serdes.String(), Serdes.String()));

SelectKey

Assigns a new key to the record. In this example, we prepend a tag to the key.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> scoresInput = builder.stream("scores-input-1");
KStream<String, String> mappedInput = scoresInput.selectKey((key, value) -> "MYTAG_" + key);
		
mappedInput.to("scores-output", Produced.with(Serdes.String(), Serdes.String()));
Refer to the Game Scoring App example above, for examples on stateful operations like GroupBy and Aggregate.

Conclusion

Kafka Streams is the easiest way to write real time applications and microservices in Java. With Kafka Streams DSL high-level API, the most common data transformation operations such as map, filter, join, and aggregations out of the box.
Read More
  • Share This:  

Friday, February 4, 2022

Getting started in GraphQL with Spring Boot

 February 04, 2022     Java, Microservices, Spring Boot     No comments   

In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support.

GraphQL Introduction

Reference: https://graphql.org/

GraphQL is a query language for the APIs, and a server-side runtime for executing queries using a type system defined for the data. GraphQL isn't tied to any specific database or storage engine and there are multiple libraries to help implement GraphQL in different languages.

In GraphQL, schema is used to describe the shape of available data. Schema defines a hierarchy of types with fields. The schema also specifies exactly which queries and mutations are available for clients to execute. We will look into details on schema in the next section.

Typically when using a REST API we will always get back a complete dataset. Referred as over-fetching we could not limit the fields REST API returns. In GraphQL, using the query language the request can be customized to what is needed down to specific fields within each entity. Once we limit the data the amount of processing needed is reduced and it starts to add up.

Some best practice recommendations for GraphQL services.
  • GraphQL is typically served over HTTP via a single endpoint which expresses the full set of capabilities of the service. This is in contrast to REST APIs which expose a suite of URLs each of which expose a single resource.
  • GraphQL services typically respond using JSON, however the GraphQL spec does not require it
  • While there's nothing that prevents a GraphQL service from being versioned just like any other REST API, GraphQL takes a strong opinion on avoiding versioning by providing the tools for the continuous evolution of a GraphQL schema.
  • The GraphQL type system allows for some fields to return lists of values, but leaves the pagination of longer lists of values up to the API designer.

GraphQL Schema Introduction

The GraphQL specification defines a human-readable schema definition language (or SDL) that can be used to define schema and store it as a string.

Let us start with an example and understand the concepts.
# Movie type has fields id, name and Director
type Movie {
  name: String!
  director: Director
}

# Director type has fields id, name and list of movies
type Director {
  name: String!
  movies: [Movie] # List of movies
}

type Query {
  movies: [Movie]
  directors: [Director]
}

type Mutation {
  addMovie(id: ID, name: String, director: String): Movie
}
In this example, we have two object types Movie and Director. Movie has the associated Director and the Director has the list of movies. Relationships are defined in a unified schema, which allows client developers to see what data is available and then request a specific subset of that data with a single optimized query.
  • Most of the types defined in a GraphQL schema are object types. An object type contains a collection of fields, each of which has its own type.
  • GraphQL's default scalar types are Int, String, Float, Boolean, ID. ID is typically a key for a cache
  • A field can return a list containing items of a particular type. This is indicated with [] square brackets
  • Non-nullability of a field can be specified with an exclamation mark !
  • The Query type is a special object type that defines all of the top-level entry points for queries that clients execute against the server
  • The Mutation type is similar to the Query type and defines entry points for write operations.
  • Input types are special object types that allow to provider data as arguments to fields as against flat scalar arguments
With this introduction let us implement an example.

MovieService example

In this example, we will implement a simple movie service to leverage GraphQL with Spring Boot. We will implement Query and Mutation support of GraphQL to list all movies, find a specific movie and to add a new movie. This example also integrates H2 database to store the movies.

GraphQL Schema

Let us now define the GraphQL schema which describes what data can be queried. Create a file "schema.graphqls" in the resources folder of the app and include the below schema.

In this schema we have "Movie" as object type. We define an input type "MovieInput" which helps in the mutation case to pass the object as whole. We also have "Query" and "Mutation" types to get movie(s) and add a movie.
type Movie {
   id: ID
   name: String
   director: String
}

input MovieInput {
   id: ID!
   name: String!
   director: String!
}

type Query {
   movies: [Movie]
   movieById(id: ID): Movie
}

type Mutation {
   addMovie(movieInput: MovieInput): Movie
}

Few additional points to note in the schema definition.
  • In the "movies" query we have not included any parenthesis since there are no arguments
  • The "!" for fields in MovieInput type indicates these are non-nullable and input is expected

Spring Boot dependencies

Create a Spring starter project and include the Spring Starter Web support, GraphQL Spring Boot starter, GraphQL Java, Starter for Spring Data JPA, H2 Database Engine, Project Lombak and GraphQL IDE for testing as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>			
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>        
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphql-spring-boot-starter</artifactId>
		<version>5.0.2</version>
	</dependency>		
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphql-java-tools</artifactId>
		<version>5.2.4</version>
	</dependency>		
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-jpa</artifactId>
	</dependency>
	<dependency>
		<groupId>com.h2database</groupId>
		<artifactId>h2</artifactId>
	</dependency>		
	<dependency>
		<groupId>com.graphql-java</groupId>
		<artifactId>graphiql-spring-boot-starter</artifactId>
		<version>3.0.3</version>
	</dependency>	
</dependencies>

Configure application.properties

Update the application.properties to specify the H2 database.
spring.datasource.url=jdbc:h2:mem:moviesdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

Define the data object

We start with the Movie POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import javax.persistence.Entity;

import javax.persistence.Id;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Movie {

	@Id
	private Integer id;
	private String name;
	private String director;
}


Define the repository

Now let us define the repository.
package com.stackstalk;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MovieRepository extends JpaRepository<Movie, Integer> {

}

Define the GraphQL query resolver

Now let us implement the GraphQL query resolver which serves the queries from the client. The GraphQLQueryResolver is a marker interface and the framework looks for method signatures matching the GraphQL schema. It is also feasible to write multiple query resolvers with a logical grouping of queries. In this example, we implement the two query methods to "Get a movie by id" and "Get all movies" matching the schema.
package com.stackstalk;

import java.util.List;
import java.util.Optional;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.coxautodev.graphql.tools.GraphQLQueryResolver;

@Component
public class MovieQueryResolver implements GraphQLQueryResolver {
	
	@Autowired
	private MovieRepository movieRepository;
	
	public Optional<Movie> movieById(Integer id) {
		return movieRepository.findById(id);
	}
	
	public List<Movie> movies() {
		return movieRepository.findAll();
	}
}

Define the GraphQL mutation resolver

Now let us implement the GraphQL mutation resolver which serves the writes from the client. In this example, we implement the add movie method which matches the mutation type defined the schema.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.coxautodev.graphql.tools.GraphQLMutationResolver;

@Component
public class MovieMutationResolver implements GraphQLMutationResolver {
	
	@Autowired
	private MovieRepository movieRepository;
	
	public Movie addMovie(Movie input) {
		Movie movie = new Movie(input.getId(), input.getName(), input.getDirector());
		return movieRepository.save(movie);
	}
}


Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MovieServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(MovieServiceApplication.class, args);
	}
}

Putting it all together and testing

Now, let us test the application and examine.

Using the GraphQL IDE

In the project dependencies we have already included the GraphiQL dependency. This is an IDE which can be leveraged to perform GraphQL queries and do the mutuations. To launch the IDE use this URL http://localhost:8080/ in the browser.
If testing on Postman use the endpoint "http://localhost:8080/graphql" to do the GraphQL queries.

Add a Movie

This mutation is used to add a new movie.

GraphQL query
mutation {
  addMovie(movieInput: {
    id: 1
    name: "Scream"
    director: "Matt Bettinelli-Olpin, Tyler Gillett"
  }) {
    id
    name
    director
  }
}
Output
{
  "data": {
    "addMovie": {
      "id": "1",
      "name": "Scream",
      "director": "Matt Bettinelli-Olpin, Tyler Gillett"
    }
  }
}

Query a movie by identifier

This query is used to get a specific movie by identifier.

GraphQL query
query {
  movieById(id: 1) {
    id
    name
    director
  }
}
Output
{
  "data": {
    "movieById": {
      "id": "1",
      "name": "Scream",
      "director": "Matt Bettinelli-Olpin, Tyler Gillett"
    }
  }
}

Query all movies

This query is used to get a list of all movies.

GraphQL query
query {
  movies {
    id
    name
    director
  }
}
Output
{
  "data": {
    "movies": [
      {
        "id": "1",
        "name": "Scream",
        "director": "Matt Bettinelli-Olpin, Tyler Gillett"
      },
      {
        "id": "2",
        "name": "Spider-Man: No Way Home",
        "director": "Jon Watts "
      }
    ]
  }
}
Get access to the full source code of this example in GitHub.

In conclusion, typically when using a REST API we will always get back a complete dataset. Referred as over-fetching we could not limit the fields REST API returns. In GraphQL, using the query language the request can be customized to what is needed down to specific fields within each entity. With Spring Boot it is pretty easy to implement GraphQL with the available library support.

Refer to this link for more Java Tutorials.
Read More
  • Share This:  

Monday, January 24, 2022

Server-Sent Events with Spring WebFlux

 January 24, 2022     Java, Microservices     No comments   

In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it would be important to understand basic concepts of reactive programming from the following links.
  • Reactive Programming with Spring WebFlux
  • Spring WebFlux Example using Annotation based programming
  • Spring WebFlux Example using Functional Programming

Server-Sent Events Introduction

Server-Sent Events (SSE) enables a client to receive automatic updates from a server using a HTTP connection without requiring to poll. These are commonly used to send message updates or continuous data streams. In SSE, events flow from server to client and is one way communication channel. Examples of SSE includes publishing notifications to the browser client, updates like stock price, game scores to all subscribed clients etc.

LogHandler example

In this example, we will implement a simple log handler service using Spring WebFlux. We expose a "/stream" endpoint and demonstrate the working of Server-Sent Events (SSE) using Spring WebFlux.

Spring Boot dependencies

Create a Spring starter project and include the Spring Reactive Web support (WebFlux), Spring Data MongoDB Reactive, Project Lombak and test modules as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>		
</dependencies>

Configure application.properties

Update the application.properties to specify the Mongo database. We are using a local Mongo database on port 27017 and the database would be "logs".
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=logs

Define the data object

We start with the LogInfo POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
@Document
public class LogInfo {
	@Id 
	private String logId;
	private Long logTimestamp;
	private String logMessage;
}

Define the repository

Now let us define the repository. Note that here we use the Mongo specific repositiry with reactive support.
package com.stackstalk;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface LogInfoRepository extends ReactiveMongoRepository<LogInfo, String> {

}

Define the Handler Function

In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono<ServerResponse>).

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.

There are multiple Sinks categories:
  • many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure
  • many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
  • many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
  • one(): a sink that will play a single element to its subscribers
  • empty(): a sink that will play a terminal signal only to its subscribers (error or complete)

In this example, we will use a replay sink which will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Component
public class LogInfoHandler {

	@Autowired
	private LogInfoRepository logRepository;
	
	Sinks.Many<LogInfo> logInfoSink = Sinks.many().replay().all();
	
	public Mono<ServerResponse> addLogInfo(ServerRequest serverRequest) {
		return serverRequest.bodyToMono(LogInfo.class)
				.doOnNext(logInfo -> {
					logInfoSink.tryEmitNext(logInfo);
				})
				.flatMap(l -> {
					return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(logRepository.save(l), LogInfo.class);
				}).log();
	}
	
	public Mono<ServerResponse> getLogStream(ServerRequest serverRequest) {
		return ServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON)
				.body(logInfoSink.asFlux(), LogInfo.class).log();
	}
}

Define the Router Function

Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono<HandlerFunction>). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class LogInfoRouter {
	
	@Bean
	public RouterFunction<ServerResponse> routes(LogInfoHandler handler) {
		return RouterFunctions.route(RequestPredicates.POST("/logs"), handler::addLogInfo)
				.andRoute(RequestPredicates.GET("/stream"), handler::getLogStream);
	}
}

Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LogHandlerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LogHandlerApplication.class, args);
	}

}

Putting it all together and testing

Now, let us test the application and examine. On starting the application observe that Netty server is being started on port 8080. Also see the usage of reactive MongoDB repository.
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.6.2)[0;39m

[2m2022-01-24 15:37:19.428[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Starting LogHandlerApplication using Java 16.0.2 on FAKEUSER-M-F1VD with PID 30335 (/Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler/target/classes started by fakeuser in /Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler)
[2m2022-01-24 15:37:19.430[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m No active profile set, falling back to default profiles: default
[2m2022-01-24 15:37:19.821[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
[2m2022-01-24 15:37:19.949[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Finished Spring Data repository scanning in 124 ms. Found 1 Reactive MongoDB repository interfaces.
[2m2022-01-24 15:37:20.372[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'}
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
[2m2022-01-24 15:37:20.612[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=120133302}
[2m2022-01-24 15:37:20.961[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.b.web.embedded.netty.NettyWebServer [0;39m [2m:[0;39m Netty started on port 8080
[2m2022-01-24 15:37:20.968[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Started LogHandlerApplication in 1.871 seconds (JVM running for 2.632)

Open a HTTP connection on the /stream endpoint

This API is used to listen for Server-Sent Events (SSE).
curl --location --request GET 'http://localhost:8080/stream' 
The HTTP connection doesn't close and returns the log messages whenever added. Whenever a new message is added the event is pushed to the client. Open multiple connections and observe the replay behavior whenever a new subscription is made.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}

Add a log message with a POST on /logs

This API is used to add a new log message.
curl --location --request POST 'http://localhost:8080/logs' \
> --header 'Content-Type: application/json' \
> --data-raw '{
>     "logId": "1",
>     "logTimestamp": 1642649553432,
>     "logMessage": "Hello. This is my first message"
> }'
Returns the log message added.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}
In the logs observe usage of reactive signals with onSubscribe, request, onNext (once) and onComplete.

Conclusion

Implementing Server-Sent Events (SSE) on the server side with reactive stream signals with Spring WebFlux is very easy. Sinks provide flexibility and abstraction to easily handle multiple requirements at ease. Refer to this link for more Java Tutorials. Get access to the full source code of this example in GitHub.
Read More
  • Share This:  

Monday, January 17, 2022

Reactive Programming with Spring WebFlux

 January 17, 2022     Java, Microservices     No comments   

This article will focus on how to do reactive programming with Spring WebFlux. It starts with introductory concepts around reactive programming, reactive streams specification, project reactor and Spring WebFlux. We will work on an example to build a simple loan info microservice to demonstrate the concepts.

Reactive Programming

Reactive programming paradigm is about asynchronous and non-blocking applications. Reactive programming is good for applications that have large streaming data and needs resiliency under high load. In this model, data is pushed to the clients as and when it is available.

Handling streams of data whose volume is unknown (e.g. live data) requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be controlled such that a fast data source does not overwhelm the stream destination referred as backpressure.


Reactive Streams Specification

Reference:http://www.reactive-streams.org/
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. The scope of Reactive Streams is to find a minimal set of interfaces, methods and protocols that will describe the necessary operations and entities to achieve the goal—asynchronous streams of data with non-blocking back pressure.

Reactive streams interfaces.
Processor<T,​R>
A Processor represents a processing stage—which is both a Subscriber and a Publisher and obeys the contracts of both.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Publisher<T>
A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
public interface Publisher<T> {
  
    public void subscribe(Subscriber<? super T> s);
}
Subscriber<T>
Will receive call to Subscriber.onSubscribe(Subscription) once after passing an instance of Subscriber to Publisher.subscribe(Subscriber).
public interface Subscriber<T> {

    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Subscription
A Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
public interface Subscription {

    public void request(long n);
    public void cancel();
}
Reactive Streams Publisher Subscriber Flow
The reactive streams publisher subscriber flow starts with a subscribe() request towards the publisher to start streaming the data. This starts a new Subscription. Followed by this a demand is made to the publisher with the number of elements using the request() method. onNext() is the data notification sent from the publisher. onComplete() is the successful terminal state.
 

Project Reactor

Reference: https://projectreactor.io/

Project Reactor is a library (reactor-core) based on the Reactive Streams specification, for building non-blocking applications on the JVM. Reactor is fully non-blocking and provides efficient demand management in the form of managing "backpressure".
Reactor introduces composable reactive types that implement Publisher but also provide a rich vocabulary of operators: Flux and Mono. 
  • Flux object represents a reactive sequence of 0..N items 
  • Mono object represents a single-value-or-empty (0..1) result

Flux

A Flux is a reactive type, standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. Simple example below to demonstrate usage and signals of Flux.
package com.stackstalk;

import java.util.List;

import reactor.core.publisher.Flux;

public class ReactorTestMain {
	
	public Flux<String> loanTypesFlux() {
		return Flux.fromIterable(List.of("Car Loan", "Education Loan", "Personal Loan")).log();
	}

	public static void main(String[] args) {
		ReactorTestMain reactorTestMain = new ReactorTestMain();
		reactorTestMain.loanTypesFlux().subscribe(name -> {
			System.out.println(name);
		});
	}
}
Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(Car Loan)
Car Loan
[ INFO] (main) | onNext(Education Loan)
Education Loan
[ INFO] (main) | onNext(Personal Loan)
Personal Loan
[ INFO] (main) | onComplete()

Mono

Mono object is a reactive type and represents a single value or empty (0..1) result. A Mono is a specialized Publisher that emits atmost one item via onNext signal then terminates with an onComplete signal. Simple example below to demonstrate usage and signals of Mono.
package com.stackstalk;

import java.util.List;

import reactor.core.publisher.Mono;

public class ReactorTestMain {
	
	public Mono<String> strMono() {
		return Mono.just("Hello World!!").log();
	}

	public static void main(String[] args) {
		ReactorTestMain reactorTestMain = new ReactorTestMain();
		reactorTestMain.strMono().subscribe(name -> {
			System.out.println(name);
		});
	}
}
Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(Hello World!!)
Hello World!!
[ INFO] (main) | onComplete()

Spring WebFlux

Reference: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html

Original web framework included in Spring Framework is Spring MVC built for servlet API and servlet containers. Spring WebFlux added later, is the reactive-stack web framework which is fully non-blocking, supports reactive streams back pressure and runs on non-blocking servers such as Netty, Undertow etc. Spring WebFlux is built on top of project reactor.

One of the key differences between Spring MVC and Spring WebFlux is the concurrency model.
  • In Spring MVC (and servlet applications in general), it is assumed that applications can block the current thread, (for example, for remote calls). For this reason, servlet containers use a large thread pool to absorb potential blocking during request handling.
  • In Spring WebFlux (and non-blocking servers in general), it is assumed that applications do not block. Therefore, non-blocking servers use a small, fixed-size thread pool (event loop workers) to handle requests.

Spring WebFlux provides two programming models. 
  • Annotated Controllers: Consistent with Spring MVC and based on the same annotations from the spring-web module. Both Spring MVC and WebFlux controllers support reactive return types.
  • Functional Endpoints: Lambda-based, lightweight, and functional programming model. This is a small library or a set of utilities that an application can use to route and handle requests.
We will see each of these programming models with an example.

Spring WebFlux Example using Annotation based programming
Spring WebFlux Example using Functional Programming

Conclusion


In this section we looked at reactive programming in general and understood couple of programming models with Spring WebFlux namely, annotation-based approach and functional endpoints approach.

When to use Spring MVC or Spring WebFlux is a common question.
  • If there is a Spring MVC application that works fine, there is no need to change. Imperative programming is the easiest way to write, understand, and debug code. If the application has blocking dependencies it is better to stay with Spring MVC.
  • Spring WebFlux is a good fit for uses cases that require ease of streaming up or down. If there is a need for lightweight, functional web framework for use with Java 8 lambdas, we can use the Spring WebFlux functional web endpoints. In a microservice architecture, we can always have a mix of applications with either Spring MVC or Spring WebFlux controllers or with Spring WebFlux functional endpoints.
Read More
  • Share This:  

Spring WebFlux Example using Functional Programming

 January 17, 2022     Java, Microservices     No comments   

This article will focus on implementing a functional model based Spring WebFlux reactive programming example. In functional programming model, functions are used to route and handle requests and contracts are designed for immutability. It is an alternative to the annotation-based programming model. Introductory concepts around reactive programming and Spring WebFlux is explained in Reactive Programming with Spring WebFlux.

LoanInfoService example

In this example, we will implement a simple loan type handling service using Spring WebFlux. We expose a "/loans" endpoint and demonstrate the usage of functional model and reactive types Flux and Mono.

Spring Boot dependencies

Create a Spring starter project and include the Spring Reactive Web support (WebFlux), Spring Data MongoDB Reactive, Project Lombak and test modules as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>		
</dependencies>

Configure application.properties

Update the application.properties to specify the Mongo database. We are using a local Mongo database on port 27017 and the database would be "loans".
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=loans

Define the data object

We start with the LoanInfo POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
@Document
public class LoanInfo {

	@Id
	private String loanId;
	private String loanType;
	private Float interestRate;
	private Integer maxTermInMonths;
	private Float processingFee;
}

Define the repository

Now let us define the repository. Note that here we use the Mongo specific repositiry with reactive support.
ppackage com.stackstalk;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface LoanInfoRepository extends ReactiveMongoRepository<LoanInfo, String> {

}

Define the Handler Function

In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono<ServerResponse>). HandlerFunction is the equivalent of the body of a @RequestMapping method in the annotation-based programming model.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;

@Component
public class LoanInfoHandler {

	@Autowired
	private LoanInfoRepository loanRepository;
	
	public Mono<ServerResponse> addLoanInfo(ServerRequest serverRequest) {
		return serverRequest.bodyToMono(LoanInfo.class)
				.flatMap(l -> {
					return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(loanRepository.save(l), LoanInfo.class);
				}).log();
	}

	public Mono<ServerResponse> getAllLoans(ServerRequest serverRequest) {
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(loanRepository.findAll(), 
				LoanInfo.class).log();	
	}

	public Mono<ServerResponse> updateLoanInfo(ServerRequest serverRequest) {
		return serverRequest.bodyToMono(LoanInfo.class)
				.flatMap(l -> {
					return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(loanRepository.save(l), LoanInfo.class);
				}).log();
	}

	public Mono<ServerResponse> deleteLoanInfo(ServerRequest serverRequest) {
		loanRepository.deleteById(serverRequest.pathVariable("loanId"));
		return ServerResponse.ok().build(Mono.empty());
	}
}

Define the Router Function

Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono<HandlerFunction>). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class LoanInfoRouter {
	
	@Bean
	public RouterFunction<ServerResponse> routes(LoanInfoHandler handler) {
		return RouterFunctions.route(RequestPredicates.GET("/loans"), handler::getAllLoans)
				.andRoute(RequestPredicates.POST("/loans"), handler::addLoanInfo)
				.andRoute(RequestPredicates.PUT("/loans"), handler::updateLoanInfo)
				.andRoute(RequestPredicates.DELETE("/loans/{loanId}"), handler::deleteLoanInfo);
	}
}

Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LoanInfoServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoanInfoServiceApplication.class, args);
	}

}

Putting it all together and testing

Now, let us test the application and examine. On starting the application observe that Netty server is being started on port 8080. Also see the usage of reactive MongoDB repository.
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.6.2)[0;39m

[2m2022-01-17 18:41:14.149[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.stackstalk.LoanInfoServiceApplication [0;39m [2m:[0;39m Starting LoanInfoServiceApplication using Java 16.0.2 on MYHOST-M-ABCD with PID 13172 (/Users/itsme/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/BookInfoService/target/classes started by itsme in /Users/itsme/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LoanInfoService)
[2m2022-01-17 18:41:14.151[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.stackstalk.LoanInfoServiceApplication [0;39m [2m:[0;39m No active profile set, falling back to default profiles: default
[2m2022-01-17 18:41:14.565[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
[2m2022-01-17 18:41:14.689[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Finished Spring Data repository scanning in 119 ms. Found 1 Reactive MongoDB repository interfaces.
[2m2022-01-17 18:41:15.136[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'}
[2m2022-01-17 18:41:15.492[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:2, serverValue:96}] to localhost:27017
[2m2022-01-17 18:41:15.492[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:1, serverValue:95}] to localhost:27017
[2m2022-01-17 18:41:15.492[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=235134928}
[2m2022-01-17 18:41:15.667[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.b.web.embedded.netty.NettyWebServer [0;39m [2m:[0;39m Netty started on port 8080
[2m2022-01-17 18:41:15.675[0;39m [32m INFO[0;39m [35m13172[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.stackstalk.LoanInfoServiceApplication [0;39m [2m:[0;39m Started LoanInfoServiceApplication in 1.848 seconds (JVM running for 2.549)

Add a loan type with POST on /loans

This API is used to add a new loan type.
curl --location --request POST 'http://localhost:8080/loans' \
> --header 'Content-Type: application/json' \
> --data-raw '{
>     "loanId": "4",
>     "loanType": "Education Loan",
>     "interestRate": 5.5,
>     "maxTermInMonths": 60,
>     "processingFee": 20
> }'
Returns the loan type added.
{"loanId":"4","loanType":"Education Loan","interestRate":5.5,"maxTermInMonths":60,"processingFee":20.0}
In the logs observe usage of reactive signals with onSubscribe, request, onNext (once) and onComplete.
[2m2022-01-17 19:09:42.690[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ntLoopGroup-3-3][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:3, serverValue:108}] to localhost:27017
[2m2022-01-17 19:10:49.384[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.FlatMap.2                  [0;39m [2m:[0;39m | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[2m2022-01-17 19:10:49.385[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.FlatMap.2                  [0;39m [2m:[0;39m | request(unbounded)
[2m2022-01-17 19:10:49.424[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.FlatMap.2                  [0;39m [2m:[0;39m | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@7afee938)
[2m2022-01-17 19:10:49.448[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.FlatMap.2                  [0;39m [2m:[0;39m | onComplete()

Query all loan types using GET on /loans

This API is used to query all added loan types.
curl --location --request GET 'http://localhost:8080/loans'
Returns the loan type added.
[{"loanId":"3","loanType":"Education Loan","interestRate":5.6,"maxTermInMonths":60,"processingFee":20.0},
 {"loanId":"4","loanType":"Education Loan","interestRate":5.5,"maxTermInMonths":60,"processingFee":20.0}]
In the logs observe usage of reactive signals with onSubscribe, request, onNext (multiple) and onComplete.
[2m2022-01-17 19:09:42.561[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.MapFuseable.1              [0;39m [2m:[0;39m | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[2m2022-01-17 19:09:42.564[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.MapFuseable.1              [0;39m [2m:[0;39m | request(unbounded)
[2m2022-01-17 19:09:42.564[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.MapFuseable.1              [0;39m [2m:[0;39m | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@8061ac0)
[2m2022-01-17 19:09:42.666[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mreactor.Mono.MapFuseable.1              [0;39m [2m:[0;39m | onComplete()
[2m2022-01-17 19:09:42.690[0;39m [32m INFO[0;39m [35m18563[0;39m [2m---[0;39m [2m[ntLoopGroup-3-3][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:3, serverValue:108}] to localhost:27017
Get access to the full source code of this example in GitHub.
Read More
  • Share This:  
Older Posts Home
Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Implement caching in a Spring Boot microservice using Redis
    In this article we will explore how to use Redis as a data cache for a Spring Boot microservice using PostgreSQL as the database. Idea is to...
  • Spring Boot with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Getting started with Kafka in Python
    This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example. What is Kafka? ...
  • Getting started in GraphQL with Spring Boot
    In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support. ...

Copyright © StackStalk