Working with RSocket

Working with RSocket

Last few years at Sentia, we started to embrace Reactive programming more and more. While creating microservices that were retrieving and exposing their data in a reactive manner from their data sources, I started to wonder how we can extend the reactive chain of events across multiple services. When I attended the Spring I/O conference last year in Barcelona, this same challenge got addressed and RSocket was mentioned. With this introduction to RSocket I decided to do some research myself.

So what is RSocket?

RSocket is an application level protocol which supports Reactive Streams semantics across network boundaries. RSocket is a binary protocol that runs on top of TCP, WebSockets and Aeron. Opposed to another frequently used application level protocol; HTTP, it supports multiple interaction models, such as streaming and push instead of plain request / response all over a single connection. RSocket also supports session resumption, which allows streams to resume across different transport connections, this is particularly useful when connections drop, switch and reconnect frequently.

But what about HTTP? Why do we want to replace it in the first place? First of all, it only supports the interaction model request / response and whenever you want to do something different such as streaming, it already becomes quite challenging and starts to feel like a hack on top of HTTP. Also is HTTP text based, which is of course easy to read / debug by humans. But becomes less efficient in a large scale distributed application landscape, where 99.99% of the messages will only be read by machine. So why not optimize it for this usage?

RSocket comes with drivers in different programming languages, such as Java, JavaScript, .NET, C++ and Kotlin. Drivers for other programming languages are currently under development by the RSocket community. For Java, RSocket has been made easy to implement by using the Spring Framework. Together with Spring Boot, it will be a matter of seconds to set up your first RSocket application.

RSocket using Spring Boot

To start with RSocket in a Spring Boot (2.4.0) application all you need to do is add the following dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

And add the following property to your application.properties:

spring.rsocket.server.port=7000

This will enable an independent RSocket Server on port 7000, using TCP as standard transport. You can also enable RSocket support in your already reactive web application (e.g. WebFlux), with the following configuration: (Note that you must be using the Reactor Netty webserver for this to work):

spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket

This setup will only work using the WebSocket protocol and reduces certain overhead by running one webserver only, you can start using RSocket on ws://<server>:<port:>/rsocket.

Different interaction models

Now that we know how to setup RSocket in a Spring Boot application, it is time to look at the different interaction models, what are they and where are they used for? RSocket knows 4 interaction models;

Request / Response

This interaction model is most likely one you are already familiar with, as it is the interaction model that HTTP offers as well and it is incredibly common. With request / response, as the name already states you send a request and you get back one response. With Spring Boot you can easily make a RSocket request / response message route using the @MessageMapping() annotation in a @Controller class, but what really makes it request / response is your method signature; e.g. a single-value asynchronous type such as Mono<T> as return type and an optional method parameter (annotated by @Payload() ) with an explicit value or single-value asynchronous type as well:

@Controller
public class RSocketController {

  @MessageMapping("article.find")
  public Mono<Article> findArticleByIdentifier(@Payload String identifier) { }
}

Invoking this message route is almost just as easy as creating it. For this we need an instance of a RSocketRequester. We can instantiate one using the RSocketRequester.Builder which is automatically available with Spring Boot autoconfiguration support. Using the RSocketRequester we can easily retrieve the article by making a RSocket request to the “article.find” message route and passing the identifier as the message payload.

public class RSocketClient {

  private final RSocketRequester requester;

  public RSocketClient(RSocketRequester.Builder requesterBuilder) {
    requester = requesterBuilder.tcp("localhost", 7000);
  }

  public Mono<Article> findArticleByIdentifier(String identifier) {
    return requester.route("article.find")
       .data(identifier)
       .retrieveMono(Article.class);
  }
}

Request / Stream

The next interaction model; request / stream. Is very familiar to request / response, except that it is streaming a stream of events instead of a single event. The method signature therefore is slightly different, in the sense that it needs a multi-value asynchronous type such as Flux<T> as a return type. Just as with request / response, a method parameter (@Payload() ) is again optional and can be supplied when needed and can also be omitted completely.

@Controller
public RSocketController {
  // snip

  @MessageMapping("articles.all")
  public Flux<Article> findArticles() { }
}

The above method signature already tells Spring to make this message route available using the request / stream interaction model. Invoking it with the RSocketRequester is almost similar to the previous example as well, except that here we need to call: .retrieveFlux(Article.class) instead.

public class RSocketClient {
  // snip

  public Flux<Article> findArticles() {
    return requester.route("articles.all")
       .retrieveFlux(Article.class);
  }
}

Fire and forget

Another interaction model that RSocket offers is called ‘fire and forget’, with fire and forget you get the ability to make requests to other services without having to wait for any response. This becomes particularly handy when invoking a different service, e.g. to delete or update something and you are not interested in the response and also don’t want to wait until the response has been committed.

When creating a method signature for a fire and forget route using Spring Boot, you have to make sure your return type is either void or Mono<Void>, like this;

@Controller
public RSocketController {
  // snip

  @MessageMapping("article.delete.{identifier}")
  public Mono<Void> deleteArticleByIdentifier(@DestinationVariable("identifier") String identifier) { }
}

What is new in the example above, is that Spring Boot also offers the ability to use variables to in the message routes. This is an alternative approach of sending the identifier in the message payload as we’ve seen in the request / response example. Invoking a fire a forget interaction model together with using a destination variable, are both very easy when using the RSocketRequester. Using the method: .route("article.delete.{identifier}", route) we can directly pass any varargs to the message route and what is different as well is that we call: .send() instead of: .retrieveMono(Article.class) or .retrieveFlux(Article.class)

public class RSocketClient {
  // snip

  public Mono<Void> deleteArticleByIdentifier(String identifier) {
    return requester.route("article.delete.{identifier}", identifier)
       .send();
  }
}

Channel

The last interaction model that RSocket offers is called ‘channel’, with channel you pass a stream of events from client to server and from server to client. With channels you can send and receive two-way chat messages, synchronize data, etc. In the example below I’ll show you how a channel can be used to save a stream of events (articles).

The method signature for a request channel is almost similar to the others, except that in this case you’ll have a multi-value asynchronous type such as Flux<T> as input parameter and return type.

@Controller
public RSocketController {
  // snip

  @MessageMapping("articles.save-all")
  public Flux<Article> saveArticles(Flux<Article> articles) { }
}

Calling channel message routes is just as easy as with any of the three other interaction models, the only difference is that the data object must be of type org.reactivestreams.Publisher and you have the pass the class of elements, e.g.; .data(articles, Article.class)

public class RSocketClient {
  // snip

  public Flux<Article> saveArticles(Flux<Article> articles) {
    return requester.route("articles.save-all")
       .data(articles, Article.class)
       .retrieveFlux(Article.class);
  }
}

In the example above articles are a Flux / stream of N article elements, which are being streamed to the “articles.save-all” route and are being streamed back to the client.

Conclusion

In this blogpost we’ve seen what RSocket is and what it can offer in a reactive landscape. Although it is a relatively new application protocol, considered to HTTP, it has great potential. It is already very easy to implement for service-to-service communication, even across different programming languages due the adoption via the different drivers. What is also interesting to explore in the future is how RSocket handles security and how it can be deployed in a more cloud distributed landscape and cover load balancing, service discovery and failure mitigation. I hope I’ve got you interested in RSocket by explaining its basics.

Dennis Nijssen
Dennis Nijssen