RSocket Complete Stack: Building Reactive Microservices with Spring Boot

A comprehensive guide to implementing message-passing architecture with RSocket and Spring Boot, covering interaction models, load balancing, resumability, and RPC patterns for microservices at scale.

GT
Gonnect Team
January 14, 202414 min readView on GitHub
RSocketSpring BootNetifiReactive StreamsProject Reactor

Introduction

As microservices architectures mature, the limitations of traditional event-driven communication become apparent. Infrastructure costs, error handling complexity, message replay challenges, and protocol overhead all contribute to operational friction. RSocket emerges as a powerful alternative - a binary protocol designed specifically for microservices communication at scale.

This article explores a complete RSocket implementation using Spring Boot, demonstrating how message-passing architecture can address the challenges of traditional approaches while enabling reactive, resilient communication between services.

Key Insight: Message Passing Architecture fueled by RSocket is designed for microservices communication at scale, reducing deployment overhead while maintaining performance standards.

Why RSocket?

Event-Driven Architecture

Loading diagram...

Challenges with Traditional Approaches

ChallengeTraditional Event-DrivenRSocket Solution
Infrastructure CostMessage brokers, queuesDirect peer-to-peer
Error HandlingComplex acknowledgment patternsBuilt-in error propagation
Message ReplayRequires external storageResumability built-in
Protocol OverheadJSON/XML serializationBinary, efficient framing
BackpressureManual implementationNative reactive streams

RSocket Interaction Models

RSocket supports four interaction models, each suited for different use cases:

┌─────────────────────────────────────────────────────────────────┐
│                  RSocket Interaction Models                     │
└─────────────────────────────────────────────────────────────────┘

1. Fire-and-Forget (0:0)
   Client ────────► Server
   (No response expected)

2. Request-Response (1:1)
   Client ─────────► Server
          ◄─────────
   (Single request, single response)

3. Request-Stream (1:N)
   Client ─────────► Server
          ◄─────────
          ◄─────────
          ◄─────────
   (Single request, stream of responses)

4. Channel (N:N)
   Client ─────────► Server
          ◄─────────
          ─────────►
          ◄─────────
   (Bidirectional streaming)

Architecture Overview

Microservices Architecture

Loading diagram...

The project is organized into focused modules:

rsocket-complete-stack/
├── interaction-model/      # DDD interaction patterns
├── load-balancing/         # Client-side load balancing
├── resumability/           # Connection recovery
├── rpc/                    # Remote procedure calls
├── spring-boot-requester/  # Producer/Client
└── spring-boot-responder/  # Consumer/Server

Module Deep Dive

1. Interaction Model - DDD Communication Patterns

This module demonstrates Domain-Driven Design interaction patterns with RSocket:

// Domain Events
public class OrderCreatedEvent {
    private final String orderId;
    private final String customerId;
    private final List<OrderItem> items;
    private final Instant timestamp;

    // Constructor, getters...
}

// RSocket Controller (Responder)
@Controller
public class OrderController {

    private final OrderService orderService;
    private final Sinks.Many<OrderCreatedEvent> orderEventSink;

    public OrderController(OrderService orderService) {
        this.orderService = orderService;
        this.orderEventSink = Sinks.many().multicast().onBackpressureBuffer();
    }

    // Fire-and-Forget: Receive order without response
    @MessageMapping("orders.create.fire-forget")
    public Mono<Void> createOrderFireAndForget(OrderRequest request) {
        return orderService.createOrder(request)
            .doOnSuccess(order ->
                orderEventSink.tryEmitNext(
                    new OrderCreatedEvent(order.getId(),
                        order.getCustomerId(),
                        order.getItems())))
            .then();
    }

    // Request-Response: Create order and return result
    @MessageMapping("orders.create")
    public Mono<Order> createOrder(OrderRequest request) {
        return orderService.createOrder(request);
    }

    // Request-Stream: Get order updates
    @MessageMapping("orders.stream")
    public Flux<OrderCreatedEvent> streamOrders() {
        return orderEventSink.asFlux();
    }

    // Channel: Bidirectional order processing
    @MessageMapping("orders.channel")
    public Flux<OrderStatus> processOrders(Flux<OrderRequest> requests) {
        return requests
            .flatMap(request -> orderService.createOrder(request))
            .map(order -> new OrderStatus(order.getId(), "PROCESSING"))
            .delayElements(Duration.ofMillis(100));
    }
}

2. Load Balancing - Distributed Request Handling

RSocket supports client-side load balancing for resilient communication:

@Configuration
public class RSocketLoadBalancerConfig {

    @Bean
    public LoadbalanceStrategy loadbalanceStrategy() {
        // Round-robin load balancing across available servers
        return LoadbalanceStrategy.ROUND_ROBIN;
    }

    @Bean
    public RSocketRequester loadBalancedRequester(
            RSocketRequester.Builder builder,
            LoadbalanceStrategy strategy) {

        // Define multiple server endpoints
        List<LoadbalanceTarget> servers = Arrays.asList(
            LoadbalanceTarget.from("server1",
                TcpClientTransport.create("localhost", 7001)),
            LoadbalanceTarget.from("server2",
                TcpClientTransport.create("localhost", 7002)),
            LoadbalanceTarget.from("server3",
                TcpClientTransport.create("localhost", 7003))
        );

        return builder
            .rsocketConnector(connector -> connector
                .reconnect(Retry.backoff(10, Duration.ofSeconds(1))))
            .transports(Flux.just(servers), strategy);
    }
}

// Load-balanced client service
@Service
public class OrderClient {

    private final RSocketRequester requester;

    public OrderClient(RSocketRequester requester) {
        this.requester = requester;
    }

    public Mono<Order> createOrder(OrderRequest request) {
        // Request is automatically load-balanced
        return requester
            .route("orders.create")
            .data(request)
            .retrieveMono(Order.class);
    }

    public Flux<OrderCreatedEvent> streamOrders() {
        return requester
            .route("orders.stream")
            .retrieveFlux(OrderCreatedEvent.class);
    }
}

3. Resumability - Connection Recovery

RSocket provides built-in session resumption for handling network interruptions:

@Configuration
public class RSocketResumabilityConfig {

    @Bean
    public RSocketServer rSocketServer(
            RSocketMessageHandler handler) {

        return RSocketServer.create()
            .acceptor(handler.responder())
            .resume(Resume.enable()
                // Session timeout
                .sessionDuration(Duration.ofMinutes(5))
                // Retry strategy for session recovery
                .retry(Retry.backoff(5, Duration.ofSeconds(1))
                    .maxBackoff(Duration.ofSeconds(30)))
                // Store for tracking incomplete streams
                .streamTimeout(Duration.ofSeconds(30)))
            .bind(TcpServerTransport.create("localhost", 7000))
            .block();
    }

    @Bean
    public RSocketRequester resumableRequester(
            RSocketRequester.Builder builder) {

        return builder
            .rsocketConnector(connector -> connector
                // Enable resumability on client
                .resume(Resume.enable()
                    .sessionDuration(Duration.ofMinutes(5))
                    .retry(Retry.backoff(5, Duration.ofSeconds(1))))
                // Automatic reconnection
                .reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
                    .maxBackoff(Duration.ofSeconds(30))))
            .tcp("localhost", 7000);
    }
}

// Resilient streaming client
@Service
public class ResilientOrderClient {

    private final RSocketRequester requester;
    private static final Logger log = LoggerFactory.getLogger(ResilientOrderClient.class);

    public Flux<OrderCreatedEvent> streamOrdersWithResumability() {
        return requester
            .route("orders.stream")
            .retrieveFlux(OrderCreatedEvent.class)
            .doOnSubscribe(s -> log.info("Starting order stream"))
            .doOnError(e -> log.warn("Stream error, will resume: {}", e.getMessage()))
            .doOnComplete(() -> log.info("Stream completed"))
            // Client-side retry for any unrecoverable errors
            .retryWhen(Retry.backoff(10, Duration.ofSeconds(1)));
    }
}

4. RPC - Remote Procedure Calls

Traditional RPC patterns implemented over RSocket:

// Service interface (shared between client and server)
public interface OrderService {
    Mono<Order> findById(String orderId);
    Mono<Order> create(OrderRequest request);
    Flux<Order> findByCustomer(String customerId);
    Mono<Void> delete(String orderId);
}

// Server-side implementation
@Service
public class OrderServiceImpl implements OrderService {

    private final OrderRepository repository;

    @Override
    public Mono<Order> findById(String orderId) {
        return repository.findById(orderId);
    }

    @Override
    public Mono<Order> create(OrderRequest request) {
        return repository.save(Order.fromRequest(request));
    }

    @Override
    public Flux<Order> findByCustomer(String customerId) {
        return repository.findByCustomerId(customerId);
    }

    @Override
    public Mono<Void> delete(String orderId) {
        return repository.deleteById(orderId);
    }
}

// RSocket RPC Controller
@Controller
public class OrderRpcController {

    private final OrderService orderService;

    @MessageMapping("order.findById")
    public Mono<Order> findById(String orderId) {
        return orderService.findById(orderId);
    }

    @MessageMapping("order.create")
    public Mono<Order> create(OrderRequest request) {
        return orderService.create(request);
    }

    @MessageMapping("order.findByCustomer")
    public Flux<Order> findByCustomer(String customerId) {
        return orderService.findByCustomer(customerId);
    }

    @MessageMapping("order.delete")
    public Mono<Void> delete(String orderId) {
        return orderService.delete(orderId);
    }
}

// Client-side proxy
@Component
public class OrderServiceProxy implements OrderService {

    private final RSocketRequester requester;

    @Override
    public Mono<Order> findById(String orderId) {
        return requester
            .route("order.findById")
            .data(orderId)
            .retrieveMono(Order.class);
    }

    @Override
    public Mono<Order> create(OrderRequest request) {
        return requester
            .route("order.create")
            .data(request)
            .retrieveMono(Order.class);
    }

    @Override
    public Flux<Order> findByCustomer(String customerId) {
        return requester
            .route("order.findByCustomer")
            .data(customerId)
            .retrieveFlux(Order.class);
    }

    @Override
    public Mono<Void> delete(String orderId) {
        return requester
            .route("order.delete")
            .data(orderId)
            .retrieveMono(Void.class);
    }
}

5. Spring Boot Requester (Producer/Client)

Complete client application configuration:

@SpringBootApplication
public class RSocketRequesterApplication {

    public static void main(String[] args) {
        SpringApplication.run(RSocketRequesterApplication.class, args);
    }

    @Bean
    public RSocketRequester rSocketRequester(
            RSocketRequester.Builder builder,
            RSocketStrategies strategies) {

        return builder
            .rsocketStrategies(strategies)
            .rsocketConnector(connector -> connector
                .payloadDecoder(PayloadDecoder.ZERO_COPY)
                .reconnect(Retry.backoff(10, Duration.ofSeconds(1))))
            .tcp("localhost", 7000);
    }

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
            .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
            .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
            .build();
    }
}

// REST Controller exposing RSocket services
@RestController
@RequestMapping("/api/orders")
public class OrderRestController {

    private final RSocketRequester requester;

    @GetMapping("/{orderId}")
    public Mono<Order> getOrder(@PathVariable String orderId) {
        return requester
            .route("order.findById")
            .data(orderId)
            .retrieveMono(Order.class);
    }

    @PostMapping
    public Mono<Order> createOrder(@RequestBody OrderRequest request) {
        return requester
            .route("order.create")
            .data(request)
            .retrieveMono(Order.class);
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<OrderCreatedEvent> streamOrders() {
        return requester
            .route("orders.stream")
            .retrieveFlux(OrderCreatedEvent.class);
    }
}

6. Spring Boot Responder (Consumer/Server)

Complete server application configuration:

@SpringBootApplication
public class RSocketResponderApplication {

    public static void main(String[] args) {
        SpringApplication.run(RSocketResponderApplication.class, args);
    }
}

// Application configuration
@Configuration
public class RSocketServerConfig {

    @Bean
    public RSocketMessageHandler rSocketMessageHandler(
            RSocketStrategies strategies) {

        RSocketMessageHandler handler = new RSocketMessageHandler();
        handler.setRSocketStrategies(strategies);
        return handler;
    }

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
            .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
            .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
            .build();
    }
}

// application.yml
spring:
  rsocket:
    server:
      port: 7000
      transport: tcp

Building and Running

Build Commands

# Clone the repository
git clone https://github.com/mgorav/rsocket-complete-stack.git
cd rsocket-complete-stack

# Build all modules
./gradlew clean build

# Build specific module
./gradlew :interaction-model:build
./gradlew :spring-boot-responder:build
./gradlew :spring-boot-requester:build

Running from IDE

The modules are designed to run from IDE for development:

  1. Start the spring-boot-responder (server) first
  2. Start the spring-boot-requester (client)
  3. Access the REST endpoints to test RSocket communication

Testing the API

# Create an order (Request-Response)
curl -X POST http://localhost:8080/api/orders \
  -H "Content-Type: application/json" \
  -d '{"customerId": "CUST-001", "items": [{"productId": "PROD-001", "quantity": 2}]}'

# Get order by ID
curl http://localhost:8080/api/orders/ORDER-001

# Stream orders (Server-Sent Events)
curl http://localhost:8080/api/orders/stream

Performance Characteristics

MetricHTTP/RESTRSocket
Connection SetupPer requestPersistent
Protocol OverheadHigh (headers)Low (binary)
BackpressureNot supportedNative
MultiplexingHTTP/2 onlyBuilt-in
BidirectionalWebSockets neededNative

Best Practices

1. Error Handling

@MessageMapping("orders.create")
public Mono<Order> createOrder(OrderRequest request) {
    return orderService.createOrder(request)
        .onErrorMap(ValidationException.class,
            e -> new ApplicationErrorException("VALIDATION_ERROR", e.getMessage()))
        .onErrorMap(DuplicateException.class,
            e -> new ApplicationErrorException("DUPLICATE_ORDER", e.getMessage()));
}

2. Metadata for Routing

// Client with metadata
public Mono<Order> createOrderWithMetadata(OrderRequest request, String traceId) {
    return requester
        .route("orders.create")
        .metadata(traceId, MimeTypeUtils.TEXT_PLAIN)
        .data(request)
        .retrieveMono(Order.class);
}

// Server extracting metadata
@MessageMapping("orders.create")
public Mono<Order> createOrder(
        OrderRequest request,
        @Header("trace-id") String traceId) {
    log.info("Processing order with trace: {}", traceId);
    return orderService.createOrder(request);
}

3. Security

@Bean
public PayloadSocketAcceptorInterceptor authorization(
        RSocketSecurity security) {
    return security
        .authorizePayload(authorize -> authorize
            .route("orders.admin.*").hasRole("ADMIN")
            .route("orders.*").authenticated()
            .anyRequest().permitAll())
        .jwt(Customizer.withDefaults())
        .build();
}

Conclusion

RSocket provides a modern, efficient protocol for microservices communication that addresses many limitations of traditional approaches. By leveraging:

  • Multiple interaction models for different use cases
  • Built-in backpressure for flow control
  • Connection resumability for resilience
  • Binary protocol for efficiency

Organizations can build reactive, resilient microservices that scale effectively.

The rsocket-complete-stack repository provides comprehensive examples covering all major RSocket patterns, ready to be adapted for production use cases.


Further Reading