Spring Web Reactive | 5. RSocket | 5.2. RSocketRequester
RSocketRequester provides a fluent API for executing RSocket requests.
It accepts and returns data and metadata objects instead of low-level data buffers.
It can be used symmetrically, either to make requests from a client or to make requests from a server.
5.2.1. Client Requester
To obtain an RSocketRequester on the client, connect to a server.
This includes sending an RSocket SETUP frame with connection settings.
RSocketRequester provides a builder that prepares io.rsocket.core.RSocketConnector, including the connection settings for the SETUP frame.
The following is the most basic way to connect with defaults.
Java
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
The code above does not connect immediately. When a request is made, a shared connection is established transparently and then used.
Connection Setup
RSocketRequester.Builder provides the following options to define the initial SETUP frame.
dataMimeType(MimeType): Sets the MIME type for data on the connection.metadataMimeType(MimeType): Sets the MIME type for metadata on the connection.setupData(Object): Data to include inSETUP.setupRoute(String, Object...): Route metadata to include inSETUP.setupMetadata(Object, MimeType): Additional metadata to include inSETUP.
For data, the default MIME type is derived from the first configured decoder. For metadata, the default MIME type is composite metadata, which allows multiple metadata value and MIME type pairs per request. Usually, there is no need to change all of these settings.
Data and metadata in the SETUP frame are optional.
On the server side, @ConnectMapping methods can be used to handle the beginning of a connection and the content of the SETUP frame.
Metadata can be used for connection-level security.
Strategies
RSocketRequester.Builder accepts RSocketStrategies to configure the requester.
Use this to provide encoders and decoders for data and metadata value serialization and deserialization.
By default, only the basic spring-core codecs for String, byte[], and ByteBuffer are registered.
If spring-web is added, you can register other features as follows.
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies is designed for reuse.
In some scenarios, such as when the client and server are in the same application, it is better to declare it in Spring configuration.
Client Responders
You can use RSocketRequester.Builder to configure responders for requests from the server.
You can use annotated handlers and responders on the client, based on the same infrastructure used on the server, but register them programmatically as follows.
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) // (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) // (3)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) // (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } // (3)
.tcp("localhost", 7000)
- (1) If
spring-webis present, usePathPatternRouteMatcherfor efficient route matching. - (2) Create a responder from a class with
@MessageMappingand/or@ConnectMappingmethods. - (3) Register the responder.
Note that the preceding code is only a utility designed for programmatic registration of client responders.
For alternative scenarios where client responders are configured through Spring configuration, declare RSocketMessageHandler as a Spring Bean and apply it as follows.
Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
In the preceding case, you may need to use setHandlerPredicate on RSocketMessageHandler to switch to another strategy for detecting client responders.
For example, you can use a custom annotation such as @RSocketClientResponder instead of the default @Controller basis.
This is needed when the same application uses both client and server roles, or when it uses multiple clients.
For details about the programming model, see annotated responders.
Advanced
RSocketRequesterBuilder provides a callback that exposes the underlying io.rsocket.core.RSocketConnector.
This gives access to advanced options such as keep-alive intervals, session resumption, and interceptors.
You can configure options at that level as follows.
Java
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
5.2.2. Server Requester
On the server, you can obtain a requester for the connected client and use it to make requests to that client.
In annotated responders, @ConnectMapping and @MessageMapping methods support an RSocketRequester argument.
Use it to access the requester for the connection.
Note that an @ConnectMapping method is essentially a SETUP frame handler and must complete before requests are started.
Therefore, the first request should be decoupled from that handling.
For example:
Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { // (1)
// ...
});
return ... // (2)
}
- (1) Start the request asynchronously, independently of the handling.
- (2) Perform handling and return
Mono<Void>when complete.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // (1)
// ...
}
}
/// ... // (2)
}
- (1) Start the request asynchronously, independently of the handling.
- (2) Perform the handling as a suspending function.
5.2.3. Requests
Once you have a requester from the client or the server, you can make requests as follows.
Java
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlux(AirportLocation.class); // (3)
Kotlin
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlow<AirportLocation>() // (3)
- (1) Specify the route to include in the request message metadata.
- (2) Provide the request message data.
- (3) Declare the expected response.
The interaction type is determined implicitly from the cardinality of input and output.
The example above is Request-Stream, because one value is sent and a stream of values is received.
In most cases, you do not need to think about this as long as the input and output selection matches the RSocket interaction type and what the responder expects.
The only invalid combination is many-to-one.
The data(Object) method accepts Reactive Streams Publisher instances such as Flux and Mono, as well as other value producers registered in ReactiveAdapterRegistry.
For a multi-value Publisher, such as a Flux that produces values of the same type, consider using one of the overloaded data methods to avoid element type checks and Encoder lookup for every element.
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
The data(Object) step is optional.
Omit it for requests that do not send data.
Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
If composite metadata is used, which is the default, and the value is supported by a registered Encoder, you can add additional metadata values.
For example:
Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
For Fire-and-Forget, use the send() method, which returns Mono<Void>.
Note that the Mono only indicates that the message was sent successfully, not that it was processed.
For Metadata-Push, use the sendMetadata() method and specify Mono<Void> as the return value.