StackStalk
  • Home
  • Java
    • Java Collection
    • Spring Boot Collection
  • Python
    • Python Collection
  • C++
    • C++ Collection
    • Progamming Problems
    • Algorithms
    • Data Structures
    • Design Patterns
  • General
    • Tips and Tricks

Monday, January 24, 2022

Server-Sent Events with Spring WebFlux

 January 24, 2022     Java, Microservices     No comments   

In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it would be important to understand basic concepts of reactive programming from the following links.
  • Reactive Programming with Spring WebFlux
  • Spring WebFlux Example using Annotation based programming
  • Spring WebFlux Example using Functional Programming

Server-Sent Events Introduction

Server-Sent Events (SSE) enables a client to receive automatic updates from a server using a HTTP connection without requiring to poll. These are commonly used to send message updates or continuous data streams. In SSE, events flow from server to client and is one way communication channel. Examples of SSE includes publishing notifications to the browser client, updates like stock price, game scores to all subscribed clients etc.

LogHandler example

In this example, we will implement a simple log handler service using Spring WebFlux. We expose a "/stream" endpoint and demonstrate the working of Server-Sent Events (SSE) using Spring WebFlux.

Spring Boot dependencies

Create a Spring starter project and include the Spring Reactive Web support (WebFlux), Spring Data MongoDB Reactive, Project Lombak and test modules as dependencies to the pom.xml. For more details on Project Lombak refer to this article "Introduction to Project Lombak".
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>		
</dependencies>

Configure application.properties

Update the application.properties to specify the Mongo database. We are using a local Mongo database on port 27017 and the database would be "logs".
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=logs

Define the data object

We start with the LogInfo POJO. We use the Lombok @Data annotation which generate the getters and setters needed.
package com.stackstalk;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
@Document
public class LogInfo {
	@Id 
	private String logId;
	private Long logTimestamp;
	private String logMessage;
}

Define the repository

Now let us define the repository. Note that here we use the Mongo specific repositiry with reactive support.
package com.stackstalk;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface LogInfoRepository extends ReactiveMongoRepository<LogInfo, String> {

}

Define the Handler Function

In WebFlux.fn, an HTTP request is handled with a HandlerFunction: a function that takes ServerRequest and returns a delayed ServerResponse (i.e. Mono<ServerResponse>).

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.

There are multiple Sinks categories:
  • many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure
  • many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
  • many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
  • one(): a sink that will play a single element to its subscribers
  • empty(): a sink that will play a terminal signal only to its subscribers (error or complete)

In this example, we will use a replay sink which will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
package com.stackstalk;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Component
public class LogInfoHandler {

	@Autowired
	private LogInfoRepository logRepository;
	
	Sinks.Many<LogInfo> logInfoSink = Sinks.many().replay().all();
	
	public Mono<ServerResponse> addLogInfo(ServerRequest serverRequest) {
		return serverRequest.bodyToMono(LogInfo.class)
				.doOnNext(logInfo -> {
					logInfoSink.tryEmitNext(logInfo);
				})
				.flatMap(l -> {
					return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(logRepository.save(l), LogInfo.class);
				}).log();
	}
	
	public Mono<ServerResponse> getLogStream(ServerRequest serverRequest) {
		return ServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON)
				.body(logInfoSink.asFlux(), LogInfo.class).log();
	}
}

Define the Router Function

Incoming requests are routed to a handler function with a RouterFunction: a function that takes ServerRequest and returns a delayed HandlerFunction (i.e. Mono<HandlerFunction>). When the router function matches, a handler function is returned; otherwise an empty Mono. RouterFunction is the equivalent of a @RequestMapping annotation, but with the major difference that router functions provide not just data, but also behavior.
package com.stackstalk;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class LogInfoRouter {
	
	@Bean
	public RouterFunction<ServerResponse> routes(LogInfoHandler handler) {
		return RouterFunctions.route(RequestPredicates.POST("/logs"), handler::addLogInfo)
				.andRoute(RequestPredicates.GET("/stream"), handler::getLogStream);
	}
}

Define the application

Finally let us define the application.
package com.stackstalk;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LogHandlerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LogHandlerApplication.class, args);
	}

}

Putting it all together and testing

Now, let us test the application and examine. On starting the application observe that Netty server is being started on port 8080. Also see the usage of reactive MongoDB repository.
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.6.2)[0;39m

[2m2022-01-24 15:37:19.428[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Starting LogHandlerApplication using Java 16.0.2 on FAKEUSER-M-F1VD with PID 30335 (/Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler/target/classes started by fakeuser in /Users/fakeuser/Documents/workspace-spring-tool-suite-4-4.12.0.RELEASE/LogHandler)
[2m2022-01-24 15:37:19.430[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m No active profile set, falling back to default profiles: default
[2m2022-01-24 15:37:19.821[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Bootstrapping Spring Data Reactive MongoDB repositories in DEFAULT mode.
[2m2022-01-24 15:37:19.949[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36m.s.d.r.c.RepositoryConfigurationDelegate[0;39m [2m:[0;39m Finished Spring Data repository scanning in 124 ms. Found 1 Reactive MongoDB repository interfaces.
[2m2022-01-24 15:37:20.372[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'}
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:1, serverValue:2}] to localhost:27017
[2m2022-01-24 15:37:20.611[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.connection           [0;39m [2m:[0;39m Opened connection [connectionId{localValue:2, serverValue:1}] to localhost:27017
[2m2022-01-24 15:37:20.612[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[localhost:27017][0;39m [36morg.mongodb.driver.cluster              [0;39m [2m:[0;39m Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=120133302}
[2m2022-01-24 15:37:20.961[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.b.web.embedded.netty.NettyWebServer [0;39m [2m:[0;39m Netty started on port 8080
[2m2022-01-24 15:37:20.968[0;39m [32m INFO[0;39m [35m30335[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.stackstalk.LogHandlerApplication    [0;39m [2m:[0;39m Started LogHandlerApplication in 1.871 seconds (JVM running for 2.632)

Open a HTTP connection on the /stream endpoint

This API is used to listen for Server-Sent Events (SSE).
curl --location --request GET 'http://localhost:8080/stream' 
The HTTP connection doesn't close and returns the log messages whenever added. Whenever a new message is added the event is pushed to the client. Open multiple connections and observe the replay behavior whenever a new subscription is made.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}

Add a log message with a POST on /logs

This API is used to add a new log message.
curl --location --request POST 'http://localhost:8080/logs' \
> --header 'Content-Type: application/json' \
> --data-raw '{
>     "logId": "1",
>     "logTimestamp": 1642649553432,
>     "logMessage": "Hello. This is my first message"
> }'
Returns the log message added.
{"logId":"1","logTimestamp":1642649553432,"logMessage":"Hello. This is my first message"}
In the logs observe usage of reactive signals with onSubscribe, request, onNext (once) and onComplete.

Conclusion

Implementing Server-Sent Events (SSE) on the server side with reactive stream signals with Spring WebFlux is very easy. Sinks provide flexibility and abstraction to easily handle multiple requirements at ease. Refer to this link for more Java Tutorials. Get access to the full source code of this example in GitHub.
  • Share This:  
Newer Post Older Post Home

0 comments:

Post a Comment

Follow @StackStalk
Get new posts by email:
Powered by follow.it

Popular Posts

  • Avro Producer and Consumer with Python using Confluent Kafka
    In this article, we will understand Avro a popular data serialization format in streaming data applications and develop a simple Avro Produc...
  • Server-Sent Events with Spring WebFlux
    In this article we will review the concepts of server-sent events and work on an example using WebFlux. Before getting into this article it ...
  • Monitor Spring Boot App with Micrometer and Prometheus
    Modern distributed applications typically have multiple microservices working together. Ability to monitor and manage aspects like health, m...
  • Implement caching in a Spring Boot microservice using Redis
    In this article we will explore how to use Redis as a data cache for a Spring Boot microservice using PostgreSQL as the database. Idea is to...
  • Python FastAPI microservice with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Spring Boot with Okta and OPA
    Authentication (AuthN) and Authorization (AuthZ) is a common challenge when developing microservices. In this article, we will explore how t...
  • Getting started with Kafka in Python
    This article will provide an overview of Kafka and how to get started with Kafka in Python with a simple example. What is Kafka? ...
  • Getting started in GraphQL with Spring Boot
    In this article we will explore basic concepts on GraphQL and look at how to develop a microservice in Spring Boot with GraphQL support. ...

Copyright © StackStalk