RSocket Complete Stack: Building Reactive Microservices with Spring Boot
A comprehensive guide to implementing message-passing architecture with RSocket and Spring Boot, covering interaction models, load balancing, resumability, and RPC patterns for microservices at scale.
Table of Contents
Introduction
As microservices architectures mature, the limitations of traditional event-driven communication become apparent. Infrastructure costs, error handling complexity, message replay challenges, and protocol overhead all contribute to operational friction. RSocket emerges as a powerful alternative - a binary protocol designed specifically for microservices communication at scale.
This article explores a complete RSocket implementation using Spring Boot, demonstrating how message-passing architecture can address the challenges of traditional approaches while enabling reactive, resilient communication between services.
Key Insight: Message Passing Architecture fueled by RSocket is designed for microservices communication at scale, reducing deployment overhead while maintaining performance standards.
Why RSocket?
Event-Driven Architecture
Challenges with Traditional Approaches
| Challenge | Traditional Event-Driven | RSocket Solution |
|---|---|---|
| Infrastructure Cost | Message brokers, queues | Direct peer-to-peer |
| Error Handling | Complex acknowledgment patterns | Built-in error propagation |
| Message Replay | Requires external storage | Resumability built-in |
| Protocol Overhead | JSON/XML serialization | Binary, efficient framing |
| Backpressure | Manual implementation | Native reactive streams |
RSocket Interaction Models
RSocket supports four interaction models, each suited for different use cases:
┌─────────────────────────────────────────────────────────────────┐
│ RSocket Interaction Models │
└─────────────────────────────────────────────────────────────────┘
1. Fire-and-Forget (0:0)
Client ────────► Server
(No response expected)
2. Request-Response (1:1)
Client ─────────► Server
◄─────────
(Single request, single response)
3. Request-Stream (1:N)
Client ─────────► Server
◄─────────
◄─────────
◄─────────
(Single request, stream of responses)
4. Channel (N:N)
Client ─────────► Server
◄─────────
─────────►
◄─────────
(Bidirectional streaming)
Architecture Overview
Microservices Architecture
The project is organized into focused modules:
rsocket-complete-stack/
├── interaction-model/ # DDD interaction patterns
├── load-balancing/ # Client-side load balancing
├── resumability/ # Connection recovery
├── rpc/ # Remote procedure calls
├── spring-boot-requester/ # Producer/Client
└── spring-boot-responder/ # Consumer/Server
Module Deep Dive
1. Interaction Model - DDD Communication Patterns
This module demonstrates Domain-Driven Design interaction patterns with RSocket:
// Domain Events
public class OrderCreatedEvent {
private final String orderId;
private final String customerId;
private final List<OrderItem> items;
private final Instant timestamp;
// Constructor, getters...
}
// RSocket Controller (Responder)
@Controller
public class OrderController {
private final OrderService orderService;
private final Sinks.Many<OrderCreatedEvent> orderEventSink;
public OrderController(OrderService orderService) {
this.orderService = orderService;
this.orderEventSink = Sinks.many().multicast().onBackpressureBuffer();
}
// Fire-and-Forget: Receive order without response
@MessageMapping("orders.create.fire-forget")
public Mono<Void> createOrderFireAndForget(OrderRequest request) {
return orderService.createOrder(request)
.doOnSuccess(order ->
orderEventSink.tryEmitNext(
new OrderCreatedEvent(order.getId(),
order.getCustomerId(),
order.getItems())))
.then();
}
// Request-Response: Create order and return result
@MessageMapping("orders.create")
public Mono<Order> createOrder(OrderRequest request) {
return orderService.createOrder(request);
}
// Request-Stream: Get order updates
@MessageMapping("orders.stream")
public Flux<OrderCreatedEvent> streamOrders() {
return orderEventSink.asFlux();
}
// Channel: Bidirectional order processing
@MessageMapping("orders.channel")
public Flux<OrderStatus> processOrders(Flux<OrderRequest> requests) {
return requests
.flatMap(request -> orderService.createOrder(request))
.map(order -> new OrderStatus(order.getId(), "PROCESSING"))
.delayElements(Duration.ofMillis(100));
}
}
2. Load Balancing - Distributed Request Handling
RSocket supports client-side load balancing for resilient communication:
@Configuration
public class RSocketLoadBalancerConfig {
@Bean
public LoadbalanceStrategy loadbalanceStrategy() {
// Round-robin load balancing across available servers
return LoadbalanceStrategy.ROUND_ROBIN;
}
@Bean
public RSocketRequester loadBalancedRequester(
RSocketRequester.Builder builder,
LoadbalanceStrategy strategy) {
// Define multiple server endpoints
List<LoadbalanceTarget> servers = Arrays.asList(
LoadbalanceTarget.from("server1",
TcpClientTransport.create("localhost", 7001)),
LoadbalanceTarget.from("server2",
TcpClientTransport.create("localhost", 7002)),
LoadbalanceTarget.from("server3",
TcpClientTransport.create("localhost", 7003))
);
return builder
.rsocketConnector(connector -> connector
.reconnect(Retry.backoff(10, Duration.ofSeconds(1))))
.transports(Flux.just(servers), strategy);
}
}
// Load-balanced client service
@Service
public class OrderClient {
private final RSocketRequester requester;
public OrderClient(RSocketRequester requester) {
this.requester = requester;
}
public Mono<Order> createOrder(OrderRequest request) {
// Request is automatically load-balanced
return requester
.route("orders.create")
.data(request)
.retrieveMono(Order.class);
}
public Flux<OrderCreatedEvent> streamOrders() {
return requester
.route("orders.stream")
.retrieveFlux(OrderCreatedEvent.class);
}
}
3. Resumability - Connection Recovery
RSocket provides built-in session resumption for handling network interruptions:
@Configuration
public class RSocketResumabilityConfig {
@Bean
public RSocketServer rSocketServer(
RSocketMessageHandler handler) {
return RSocketServer.create()
.acceptor(handler.responder())
.resume(Resume.enable()
// Session timeout
.sessionDuration(Duration.ofMinutes(5))
// Retry strategy for session recovery
.retry(Retry.backoff(5, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30)))
// Store for tracking incomplete streams
.streamTimeout(Duration.ofSeconds(30)))
.bind(TcpServerTransport.create("localhost", 7000))
.block();
}
@Bean
public RSocketRequester resumableRequester(
RSocketRequester.Builder builder) {
return builder
.rsocketConnector(connector -> connector
// Enable resumability on client
.resume(Resume.enable()
.sessionDuration(Duration.ofMinutes(5))
.retry(Retry.backoff(5, Duration.ofSeconds(1))))
// Automatic reconnection
.reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30))))
.tcp("localhost", 7000);
}
}
// Resilient streaming client
@Service
public class ResilientOrderClient {
private final RSocketRequester requester;
private static final Logger log = LoggerFactory.getLogger(ResilientOrderClient.class);
public Flux<OrderCreatedEvent> streamOrdersWithResumability() {
return requester
.route("orders.stream")
.retrieveFlux(OrderCreatedEvent.class)
.doOnSubscribe(s -> log.info("Starting order stream"))
.doOnError(e -> log.warn("Stream error, will resume: {}", e.getMessage()))
.doOnComplete(() -> log.info("Stream completed"))
// Client-side retry for any unrecoverable errors
.retryWhen(Retry.backoff(10, Duration.ofSeconds(1)));
}
}
4. RPC - Remote Procedure Calls
Traditional RPC patterns implemented over RSocket:
// Service interface (shared between client and server)
public interface OrderService {
Mono<Order> findById(String orderId);
Mono<Order> create(OrderRequest request);
Flux<Order> findByCustomer(String customerId);
Mono<Void> delete(String orderId);
}
// Server-side implementation
@Service
public class OrderServiceImpl implements OrderService {
private final OrderRepository repository;
@Override
public Mono<Order> findById(String orderId) {
return repository.findById(orderId);
}
@Override
public Mono<Order> create(OrderRequest request) {
return repository.save(Order.fromRequest(request));
}
@Override
public Flux<Order> findByCustomer(String customerId) {
return repository.findByCustomerId(customerId);
}
@Override
public Mono<Void> delete(String orderId) {
return repository.deleteById(orderId);
}
}
// RSocket RPC Controller
@Controller
public class OrderRpcController {
private final OrderService orderService;
@MessageMapping("order.findById")
public Mono<Order> findById(String orderId) {
return orderService.findById(orderId);
}
@MessageMapping("order.create")
public Mono<Order> create(OrderRequest request) {
return orderService.create(request);
}
@MessageMapping("order.findByCustomer")
public Flux<Order> findByCustomer(String customerId) {
return orderService.findByCustomer(customerId);
}
@MessageMapping("order.delete")
public Mono<Void> delete(String orderId) {
return orderService.delete(orderId);
}
}
// Client-side proxy
@Component
public class OrderServiceProxy implements OrderService {
private final RSocketRequester requester;
@Override
public Mono<Order> findById(String orderId) {
return requester
.route("order.findById")
.data(orderId)
.retrieveMono(Order.class);
}
@Override
public Mono<Order> create(OrderRequest request) {
return requester
.route("order.create")
.data(request)
.retrieveMono(Order.class);
}
@Override
public Flux<Order> findByCustomer(String customerId) {
return requester
.route("order.findByCustomer")
.data(customerId)
.retrieveFlux(Order.class);
}
@Override
public Mono<Void> delete(String orderId) {
return requester
.route("order.delete")
.data(orderId)
.retrieveMono(Void.class);
}
}
5. Spring Boot Requester (Producer/Client)
Complete client application configuration:
@SpringBootApplication
public class RSocketRequesterApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketRequesterApplication.class, args);
}
@Bean
public RSocketRequester rSocketRequester(
RSocketRequester.Builder builder,
RSocketStrategies strategies) {
return builder
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.reconnect(Retry.backoff(10, Duration.ofSeconds(1))))
.tcp("localhost", 7000);
}
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
}
// REST Controller exposing RSocket services
@RestController
@RequestMapping("/api/orders")
public class OrderRestController {
private final RSocketRequester requester;
@GetMapping("/{orderId}")
public Mono<Order> getOrder(@PathVariable String orderId) {
return requester
.route("order.findById")
.data(orderId)
.retrieveMono(Order.class);
}
@PostMapping
public Mono<Order> createOrder(@RequestBody OrderRequest request) {
return requester
.route("order.create")
.data(request)
.retrieveMono(Order.class);
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderCreatedEvent> streamOrders() {
return requester
.route("orders.stream")
.retrieveFlux(OrderCreatedEvent.class);
}
}
6. Spring Boot Responder (Consumer/Server)
Complete server application configuration:
@SpringBootApplication
public class RSocketResponderApplication {
public static void main(String[] args) {
SpringApplication.run(RSocketResponderApplication.class, args);
}
}
// Application configuration
@Configuration
public class RSocketServerConfig {
@Bean
public RSocketMessageHandler rSocketMessageHandler(
RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(strategies);
return handler;
}
@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
}
// application.yml
spring:
rsocket:
server:
port: 7000
transport: tcp
Building and Running
Build Commands
# Clone the repository
git clone https://github.com/mgorav/rsocket-complete-stack.git
cd rsocket-complete-stack
# Build all modules
./gradlew clean build
# Build specific module
./gradlew :interaction-model:build
./gradlew :spring-boot-responder:build
./gradlew :spring-boot-requester:build
Running from IDE
The modules are designed to run from IDE for development:
- Start the spring-boot-responder (server) first
- Start the spring-boot-requester (client)
- Access the REST endpoints to test RSocket communication
Testing the API
# Create an order (Request-Response)
curl -X POST http://localhost:8080/api/orders \
-H "Content-Type: application/json" \
-d '{"customerId": "CUST-001", "items": [{"productId": "PROD-001", "quantity": 2}]}'
# Get order by ID
curl http://localhost:8080/api/orders/ORDER-001
# Stream orders (Server-Sent Events)
curl http://localhost:8080/api/orders/stream
Performance Characteristics
| Metric | HTTP/REST | RSocket |
|---|---|---|
| Connection Setup | Per request | Persistent |
| Protocol Overhead | High (headers) | Low (binary) |
| Backpressure | Not supported | Native |
| Multiplexing | HTTP/2 only | Built-in |
| Bidirectional | WebSockets needed | Native |
Best Practices
1. Error Handling
@MessageMapping("orders.create")
public Mono<Order> createOrder(OrderRequest request) {
return orderService.createOrder(request)
.onErrorMap(ValidationException.class,
e -> new ApplicationErrorException("VALIDATION_ERROR", e.getMessage()))
.onErrorMap(DuplicateException.class,
e -> new ApplicationErrorException("DUPLICATE_ORDER", e.getMessage()));
}
2. Metadata for Routing
// Client with metadata
public Mono<Order> createOrderWithMetadata(OrderRequest request, String traceId) {
return requester
.route("orders.create")
.metadata(traceId, MimeTypeUtils.TEXT_PLAIN)
.data(request)
.retrieveMono(Order.class);
}
// Server extracting metadata
@MessageMapping("orders.create")
public Mono<Order> createOrder(
OrderRequest request,
@Header("trace-id") String traceId) {
log.info("Processing order with trace: {}", traceId);
return orderService.createOrder(request);
}
3. Security
@Bean
public PayloadSocketAcceptorInterceptor authorization(
RSocketSecurity security) {
return security
.authorizePayload(authorize -> authorize
.route("orders.admin.*").hasRole("ADMIN")
.route("orders.*").authenticated()
.anyRequest().permitAll())
.jwt(Customizer.withDefaults())
.build();
}
Conclusion
RSocket provides a modern, efficient protocol for microservices communication that addresses many limitations of traditional approaches. By leveraging:
- Multiple interaction models for different use cases
- Built-in backpressure for flow control
- Connection resumability for resilience
- Binary protocol for efficiency
Organizations can build reactive, resilient microservices that scale effectively.
The rsocket-complete-stack repository provides comprehensive examples covering all major RSocket patterns, ready to be adapted for production use cases.