Spring Web Reactive | 3. WebSocket | 3.2. WebSocket API

Same as in the Servlet stack

The Spring Framework provides a WebSocket API that you can use to create client-side and server-side applications that handle WebSocket messages.

3.2.1. Server

Same as in the Servlet stack

To create a WebSocket server, you can first create a WebSocketHandler. The following example shows how to do that.

Java

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        // ...
    }
}

Next, you can map it to a URL.

Java

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerMapping(): HandlerMapping {
        val map = mapOf("/path" to MyWebSocketHandler())
        val order = -1 // before annotated controllers

        return SimpleUrlHandlerMapping(map, order)
    }
}

If you use WebFlux configuration, nothing more is required. Otherwise, if you are not using WebFlux configuration, you must declare a WebSocketHandlerAdapter as follows.

Java

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Kotlin

@Configuration
class WebConfig {

    // ...

    @Bean
    fun handlerAdapter() =  WebSocketHandlerAdapter()
}

3.2.2. WebSocketHandler

The handle method of WebSocketHandler takes a WebSocketSession and returns Mono<Void> to indicate that application handling of the session is complete. The session is handled through two streams: one for received messages and one for sent messages. The following table shows the two methods that handle those streams.

WebSocketSession method Description
Flux<WebSocketMessage> receive() Provides access to the inbound message stream and completes when the connection is closed.
Mono<Void> send(Publisher<WebSocketMessage>) Takes a source of outbound messages, writes the messages, and returns a Mono<Void> that completes when the source completes and writing is done.

WebSocketHandler must compose the inbound and outbound streams into a unified flow and return a Mono<Void> that reflects completion of that flow. Depending on application requirements, the unified flow completes in the following cases.

  • When the inbound or outbound message stream completes.
  • When the send stream is infinite and the outbound stream completes, that is, when the connection is closed.
  • When the close method of WebSocketSession is called.

When inbound and outbound message streams are composed together, Reactive Streams does not need to check whether the connection is open to signal termination activity. The inbound stream receives a completion or error signal, and the outbound stream receives a cancellation signal.

The most basic implementation of a handler processes the inbound stream. The following example shows such an implementation.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()              // (1)
                .doOnNext(message -> {
                    // ...                    // (2)
                })
                .concatMap(message -> {
                    // ...                    // (3)
                })
                .then();                      // (4)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        return session.receive()              // (1)
                .doOnNext {
                    // ...                    // (2)
                }
                .concatMap {
                    // ...                    // (3)
                }
                .then()                       // (4)
    }
}
  • (1) Access the inbound message stream.
  • (2) Do something with each message.
  • (3) Perform a nested asynchronous operation that uses the message content.
  • (4) Return a Mono<Void> that completes when receiving ends.

For nested asynchronous operations, you must call message.retain() on underlying servers that use pooled data buffers, such as Netty. Otherwise, the data buffer may be released before the data is read. For background, see Data Buffers and Codecs.

The following implementation combines the receive stream and the send stream.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));      // (2)

        return session.send(output);                                      // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val output = session.receive()                       // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .map { session.textMessage("Echo $it") }     // (2)

        return session.send(output)                          // (3)
    }
}
  • (1) Process the received message stream.
  • (2) Create outgoing messages and build the combined flow.
  • (3) Return a Mono<Void> that does not complete while messages are being received.

As shown in the following example, the receive stream and send stream can be handled independently and combined only when they complete.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage));  // (2)

        return Mono.zip(input, output).then();                               // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()                                     // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .then()

        val source: Flux<String> = ...
        val output = session.send(source.map(session::textMessage))       // (2)

        return Mono.zip(input, output).then()                            //  (3)
    }
}
  • (1) Process the received message stream.
  • (2) Send outgoing messages.
  • (3) Combine the streams and return a Mono<Void> that completes when either stream ends.

3.2.3. DataBuffer

DataBuffer is WebFlux’s representation of a byte buffer. The Spring Core part explains this in detail in the Data Buffers and Codecs section. The important point to understand is that, on some servers such as Netty, byte buffers are pooled and reference-counted and must be released after they are consumed to avoid memory leaks.

When running on Netty, if an application retains input data buffers so they are not released, it must use DataBufferUtils.retain(dataBuffer) and then use DataBufferUtils.release(dataBuffer) when the buffer is consumed.

3.2.4. Handshake

Same as in the Servlet stack

WebSocketHandlerAdapter delegates to WebSocketService. By default, this is an instance of HandshakeWebSocketService, which performs basic checks for WebSocket requests and uses a RequestUpgradeStrategy for the server in use. Support currently includes Reactor Netty, Tomcat, Jetty, and Undertow.

HandshakeWebSocketService provides a sessionAttributePredicate property that lets you set a Predicate<String> and extract attributes from WebSession to insert them into the attributes of WebSocketSession.

3.2.5. Server Configuration

Same as in the Servlet stack

Each server’s RequestUpgradeStrategy provides settings specific to the underlying WebSocket server engine. If you use WebFlux Java configuration, you can specify the same properties shown in the corresponding section of WebFlux configuration. Otherwise, if you do not use WebFlux configuration, use the following.

Java

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerAdapter() =
            WebSocketHandlerAdapter(webSocketService())

    @Bean
    fun webSocketService(): WebSocketService {
        val strategy = TomcatRequestUpgradeStrategy().apply {
            setMaxSessionIdleTimeout(0L)
        }
        return HandshakeWebSocketService(strategy)
    }
}

Check the server upgrade strategy to see the available options. Currently, only Tomcat and Jetty expose such options.

3.2.6. CORS

Same as in the Servlet stack

The easiest way to configure CORS and restrict access to a WebSocket endpoint is to implement CorsConfigurationSource on the WebSocketHandler and return a CorsConfiguration that includes allowed origins, headers, and other details. If that is not possible, you can also set the corsConfigurations property of SimpleUrlHandler to specify CORS settings by URL pattern. If both are specified, they are combined with the combine method of CorsConfiguration.

3.2.7. Client

Spring WebFlux provides a WebSocketClient abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java, that is, JSR-356.

The Tomcat client is effectively an extension of standard Java with a few additional features for WebSocketSession handling, using Tomcat-specific APIs to pause receiving messages for back pressure.

To start a WebSocket session, create an instance of the client and use the execute method.

Java

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

Kotlin

val client = ReactorNettyWebSocketClient()

        val url = URI("ws://localhost:8080/path")
        client.execute(url) { session ->
            session.receive()
                    .doOnNext(::println)
            .then()
        }

Some clients, such as Jetty, implement Lifecycle and must be stopped and started before use. All clients have constructor options for the underlying WebSocket client configuration.