Spring Web Reactive | 3. WebSocket | 3.2. WebSocket API
The Spring Framework provides a WebSocket API that you can use to create client-side and server-side applications that handle WebSocket messages.
3.2.1. Server
To create a WebSocket server, you can first create a WebSocketHandler. The following example shows how to do that.
Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
Next, you can map it to a URL.
Java
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
If you use WebFlux configuration, nothing more is required. Otherwise, if you are not using WebFlux configuration, you must declare a WebSocketHandlerAdapter as follows.
Java
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
3.2.2. WebSocketHandler
The handle method of WebSocketHandler takes a WebSocketSession and returns Mono<Void> to indicate that application handling of the session is complete. The session is handled through two streams: one for received messages and one for sent messages. The following table shows the two methods that handle those streams.
| WebSocketSession method | Description |
|---|---|
Flux<WebSocketMessage> receive() |
Provides access to the inbound message stream and completes when the connection is closed. |
Mono<Void> send(Publisher<WebSocketMessage>) |
Takes a source of outbound messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done. |
WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono<Void> that reflects completion of that flow. Depending on application requirements, the unified flow completes in the following cases.
- When the inbound or outbound message stream completes.
- When the send stream is infinite and the outbound stream completes, that is, when the connection is closed.
- When the
closemethod ofWebSocketSessionis called.
When inbound and outbound message streams are composed together, Reactive Streams does not need to check whether the connection is open to signal termination activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.
The most basic implementation of a handler processes the inbound stream. The following example shows such an implementation.
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() // (1)
.doOnNext(message -> {
// ... // (2)
})
.concatMap(message -> {
// ... // (3)
})
.then(); // (4)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() // (1)
.doOnNext {
// ... // (2)
}
.concatMap {
// ... // (3)
}
.then() // (4)
}
}
- (1) Access the inbound message stream.
- (2) Do something with each message.
- (3) Perform a nested asynchronous operation that uses the message content.
- (4) Return a
Mono<Void>that completes when receiving ends.
For nested asynchronous operations, you must call
message.retain()on underlying servers that use pooled data buffers, such as Netty. Otherwise, the data buffer may be released before the data is read. For background, see Data Buffers and Codecs.
The following implementation combines the receive stream and the send stream.
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() // (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); // (2)
return session.send(output); // (3)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() // (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } // (2)
return session.send(output) // (3)
}
}
- (1) Process the received message stream.
- (2) Create outgoing messages and build the combined flow.
- (3) Return a
Mono<Void>that does not complete while messages are being received.
As shown in the following example, the receive stream and send stream can be handled independently and combined only when they complete.
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() // (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); // (2)
return Mono.zip(input, output).then(); // (3)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() // (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) // (2)
return Mono.zip(input, output).then() // (3)
}
}
- (1) Process the received message stream.
- (2) Send outgoing messages.
- (3) Combine the streams and return a
Mono<Void>that completes when either stream ends.
3.2.3. DataBuffer
DataBuffer is WebFlux’s representation of a byte buffer. The Spring Core part explains this in detail in the Data Buffers and Codecs section. The important point to understand is that, on some servers such as Netty, byte buffers are pooled and reference-counted and must be released after they are consumed to avoid memory leaks.
When running on Netty, if an application retains input data buffers so they are not released, it must use DataBufferUtils.retain(dataBuffer) and then use DataBufferUtils.release(dataBuffer) when the buffer is consumed.
3.2.4. Handshake
WebSocketHandlerAdapter delegates to WebSocketService. By default, this is an instance of HandshakeWebSocketService, which performs basic checks for WebSocket requests and uses a RequestUpgradeStrategy for the server in use. Support currently includes Reactor Netty, Tomcat, Jetty, and Undertow.
HandshakeWebSocketService provides a sessionAttributePredicate property that lets you set a Predicate<String> and extract attributes from WebSession to insert them into the attributes of WebSocketSession.
3.2.5. Server Configuration
Each server’s RequestUpgradeStrategy provides settings specific to the underlying WebSocket server engine. If you use WebFlux Java configuration, you can specify the same properties shown in the corresponding section of WebFlux configuration. Otherwise, if you do not use WebFlux configuration, use the following.
Java
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
Check the server upgrade strategy to see the available options. Currently, only Tomcat and Jetty expose such options.
3.2.6. CORS
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to implement CorsConfigurationSource on the WebSocketHandler and return a CorsConfiguration that includes allowed origins, headers, and other details. If that is not possible, you can also set the corsConfigurations property of SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified, they are combined with the combine method of CorsConfiguration.
3.2.7. Client
Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java, that is, JSR-356.
The Tomcat client is effectively an extension of standard Java with a few additional features for
WebSocketSessionhandling, using Tomcat-specific APIs to pause receiving messages for back pressure.
To start a WebSocket session, create an instance of the client and use the execute method.
Java
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
Kotlin
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
Some clients, such as Jetty, implement Lifecycle and must be stopped and started before use. All clients have constructor options for the underlying WebSocket client configuration.