Spring Web Reactive | 3. WebSocket | 3.2. WebSocket API

Servletスタックと同じ

Spring Frameworkは、WebSocketメッセージを処理するクライアント側およびサーバー側アプリケーションを作成するために使用できるWebSocket APIを提供します。

3.2.1. サーバー

Servletスタックと同じ

WebSocketサーバーを作成するには、まずWebSocketHandlerを作成できます。次の例はその方法を示しています。

Java

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        // ...
    }
}

次に、それをURLへマッピングできます。

Java

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerMapping(): HandlerMapping {
        val map = mapOf("/path" to MyWebSocketHandler())
        val order = -1 // before annotated controllers

        return SimpleUrlHandlerMapping(map, order)
    }
}

WebFlux構成を使用している場合、それ以上何もする必要はありません。そうでない場合、つまりWebFlux構成を使用しない場合は、次のようにWebSocketHandlerAdapterを宣言する必要があります。

Java

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Kotlin

@Configuration
class WebConfig {

    // ...

    @Bean
    fun handlerAdapter() =  WebSocketHandlerAdapter()
}

3.2.2. WebSocketHandler

WebSocketHandlerhandleメソッドはWebSocketSessionを受け取り、Mono<Void>を返して、セッションのアプリケーション処理が完了したことを示します。セッションは、受信メッセージ用と送信メッセージ用の2つのストリームを通じて処理されます。次の表は、これらのストリームを処理する2つのメソッドを示しています。

WebSocketSessionメソッド 説明
Flux<WebSocketMessage> receive() インバウンドメッセージストリームへのアクセスを提供し、接続が閉じられると完了します。
Mono<Void> send(Publisher<WebSocketMessage>) アウトバウンドメッセージのソースを受け取り、メッセージを書き込み、ソースが完了して書き込みが完了すると完了するMono<Void>を返します。

WebSocketHandlerは、受信ストリームと送信ストリームを統合されたフローに構成し、そのフローの完了を反映するMono<Void>を返す必要があります。アプリケーション要件に応じて、統合フローは次の場合に完了します。

  • 受信または送信メッセージストリームが完了したとき。
  • 送信ストリームが無限で、アウトバウンドストリームが完了したとき、つまり接続が閉じられたとき。
  • WebSocketSessioncloseメソッドを呼び出したとき。

インバウンドおよびアウトバウンドメッセージストリームが一緒に構成されている場合、Reactive Streamsは終了動作を通知するために接続が開いているかどうかを確認する必要はありません。インバウンドストリームは完了またはエラー信号を受け取り、アウトバウンドストリームはキャンセル信号を受け取ります。

ハンドラーの最も基本的な実装は、インバウンドストリームを処理することです。次の例はそのような実装を示しています。

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()              // (1)
                .doOnNext(message -> {
                    // ...                    // (2)
                })
                .concatMap(message -> {
                    // ...                    // (3)
                })
                .then();                      // (4)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        return session.receive()              // (1)
                .doOnNext {
                    // ...                    // (2)
                }
                .concatMap {
                    // ...                    // (3)
                }
                .then()                       // (4)
    }
}
  • (1) インバウンドメッセージストリームにアクセスします。
  • (2) 各メッセージに対して何かを行います。
  • (3) メッセージ内容を使用するネストされた非同期処理を実行します。
  • (4) 受信が終了すると完了するMono<Void>を返します。

ネストされた非同期処理の場合、Nettyなど、プールされたデータバッファを使用する基盤サーバーではmessage.retain()を呼び出す必要があります。そうしないと、データを読み取る前にデータバッファが解放される可能性があります。背景については、Data Buffers and Codecsを参照してください。

次の実装は、受信ストリームと送信ストリームを結合します。

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));      // (2)

        return session.send(output);                                      // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val output = session.receive()                       // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .map { session.textMessage("Echo $it") }     // (2)

        return session.send(output)                          // (3)
    }
}
  • (1) 受信したメッセージストリームを処理します。
  • (2) 送信するメッセージを作成し、結合されたフローを作ります。
  • (3) メッセージを受信している間は完了しないMono<Void>を返します。

次の例のように、受信ストリームと送信ストリームは独立して処理し、完了したときだけ結合できます。

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage));  // (2)

        return Mono.zip(input, output).then();                               // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()                                     // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .then()

        val source: Flux<String> = ...
        val output = session.send(source.map(session::textMessage))       // (2)

        return Mono.zip(input, output).then()                            //  (3)
    }
}
  • (1) 受信メッセージストリームを処理します。
  • (2) 送信メッセージを送ります。
  • (3) ストリームを結合し、いずれかのストリームが終了すると完了するMono<Void>を返します。

3.2.3. DataBuffer

DataBufferはWebFluxにおけるバイトバッファの表現です。Spring Core部分では、Data Buffers and Codecsセクションで詳しく説明されています。理解すべき重要な点は、Nettyなど一部のサーバーでは、バイトバッファがプールされ参照カウントされており、メモリリークを避けるため、消費された後は必ず解放しなければならないことです。

Nettyで実行する場合、アプリケーションが入力データバッファを解放されないよう保持する場合は、DataBufferUtils.retain(dataBuffer)を使用し、バッファが消費されたときにDataBufferUtils.release(dataBuffer)を使用する必要があります。

3.2.4. ハンドシェイク(Handshake)

Servletスタックと同じ

WebSocketHandlerAdapterWebSocketServiceへ委譲します。デフォルトでは、これはHandshakeWebSocketServiceのインスタンスであり、WebSocketリクエストに対して基本的なチェックを行い、使用中のサーバーにRequestUpgradeStrategyを使用します。現在、Reactor Netty、Tomcat、Jetty、Undertowのサポートが含まれています。

HandshakeWebSocketServiceは、Predicate<String>を設定し、WebSessionから属性を抽出してWebSocketSessionの属性へ挿入できるsessionAttributePredicateプロパティを提供します。

3.2.5. サーバー構成

Servletスタックと同じ

各サーバーのRequestUpgradeStrategyは、基盤となるWebSocketサーバーエンジンに固有の設定を提供します。WebFlux Java構成を使用する場合は、WebFlux構成の該当セクションに示されている同じプロパティを指定できます。そうでない場合、WebFlux構成を使用しない場合は次のようにします。

Java

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerAdapter() =
            WebSocketHandlerAdapter(webSocketService())

    @Bean
    fun webSocketService(): WebSocketService {
        val strategy = TomcatRequestUpgradeStrategy().apply {
            setMaxSessionIdleTimeout(0L)
        }
        return HandshakeWebSocketService(strategy)
    }
}

サーバーアップグレード戦略を確認して、使用可能なオプションを確認してください。現在、TomcatとJettyのみがこれらのオプションを公開しています。

3.2.6. CORS

Servletスタックと同じ

CORSを設定し、WebSocketエンドポイントへのアクセスを制限する最も簡単な方法は、WebSocketHandlerCorsConfigurationSourceを実装し、許可されたオリジン、ヘッダー、その他の詳細を含むCorsConfigurationを返すことです。それができない場合は、SimpleUrlHandlercorsConfigurationsプロパティを設定して、URLパターンごとにCORS設定を指定することもできます。両方が指定されている場合は、CorsConfigurationcombineメソッドを使用して結合されます。

3.2.7. クライアント

Spring WebFluxは、Reactor Netty、Tomcat、Jetty、Undertow、標準Java、つまりJSR-356の実装を提供するWebSocketClient抽象化を提供します。

Tomcatクライアントは実質的に、WebSocketSession処理にいくつかの追加機能を持つ標準Javaの拡張であり、Tomcat固有のAPIを使用してバックプレッシャーのためにメッセージ受信を一時停止します。

WebSocketセッションを開始するには、クライアントのインスタンスを作成してexecuteメソッドを使用できます。

Java

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

Kotlin

val client = ReactorNettyWebSocketClient()

        val url = URI("ws://localhost:8080/path")
        client.execute(url) { session ->
            session.receive()
                    .doOnNext(::println)
            .then()
        }

Jettyなど一部のクライアントはLifecycleを実装しており、使用前に停止および開始する必要があります。すべてのクライアントには、基盤となるWebSocketクライアント構成のためのコンストラクターオプションがあります。