Spring Web Reactive | 3. WebSocket | 3.2. WebSocket API
Spring Frameworkは、WebSocketメッセージを処理するクライアント側およびサーバー側アプリケーションを作成するために使用できるWebSocket APIを提供します。
3.2.1. サーバー
WebSocketサーバーを作成するには、まずWebSocketHandlerを作成できます。次の例はその方法を示しています。
Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
次に、それをURLへマッピングできます。
Java
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
WebFlux構成を使用している場合、それ以上何もする必要はありません。そうでない場合、つまりWebFlux構成を使用しない場合は、次のようにWebSocketHandlerAdapterを宣言する必要があります。
Java
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
3.2.2. WebSocketHandler
WebSocketHandlerのhandleメソッドはWebSocketSessionを受け取り、Mono<Void>を返して、セッションのアプリケーション処理が完了したことを示します。セッションは、受信メッセージ用と送信メッセージ用の2つのストリームを通じて処理されます。次の表は、これらのストリームを処理する2つのメソッドを示しています。
| WebSocketSessionメソッド | 説明 |
|---|---|
Flux<WebSocketMessage> receive() |
インバウンドメッセージストリームへのアクセスを提供し、接続が閉じられると完了します。 |
Mono<Void> send(Publisher<WebSocketMessage>) |
アウトバウンドメッセージのソースを受け取り、メッセージを書き込み、ソースが完了して書き込みが完了すると完了するMono<Void>を返します。 |
WebSocketHandlerは、受信ストリームと送信ストリームを統合されたフローに構成し、そのフローの完了を反映するMono<Void>を返す必要があります。アプリケーション要件に応じて、統合フローは次の場合に完了します。
- 受信または送信メッセージストリームが完了したとき。
- 送信ストリームが無限で、アウトバウンドストリームが完了したとき、つまり接続が閉じられたとき。
WebSocketSessionのcloseメソッドを呼び出したとき。
インバウンドおよびアウトバウンドメッセージストリームが一緒に構成されている場合、Reactive Streamsは終了動作を通知するために接続が開いているかどうかを確認する必要はありません。インバウンドストリームは完了またはエラー信号を受け取り、アウトバウンドストリームはキャンセル信号を受け取ります。
ハンドラーの最も基本的な実装は、インバウンドストリームを処理することです。次の例はそのような実装を示しています。
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() // (1)
.doOnNext(message -> {
// ... // (2)
})
.concatMap(message -> {
// ... // (3)
})
.then(); // (4)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() // (1)
.doOnNext {
// ... // (2)
}
.concatMap {
// ... // (3)
}
.then() // (4)
}
}
- (1) インバウンドメッセージストリームにアクセスします。
- (2) 各メッセージに対して何かを行います。
- (3) メッセージ内容を使用するネストされた非同期処理を実行します。
- (4) 受信が終了すると完了する
Mono<Void>を返します。
ネストされた非同期処理の場合、Nettyなど、プールされたデータバッファを使用する基盤サーバーでは
message.retain()を呼び出す必要があります。そうしないと、データを読み取る前にデータバッファが解放される可能性があります。背景については、Data Buffers and Codecsを参照してください。
次の実装は、受信ストリームと送信ストリームを結合します。
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() // (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); // (2)
return session.send(output); // (3)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() // (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } // (2)
return session.send(output) // (3)
}
}
- (1) 受信したメッセージストリームを処理します。
- (2) 送信するメッセージを作成し、結合されたフローを作ります。
- (3) メッセージを受信している間は完了しない
Mono<Void>を返します。
次の例のように、受信ストリームと送信ストリームは独立して処理し、完了したときだけ結合できます。
Java
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() // (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); // (2)
return Mono.zip(input, output).then(); // (3)
}
}
Kotlin
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() // (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) // (2)
return Mono.zip(input, output).then() // (3)
}
}
- (1) 受信メッセージストリームを処理します。
- (2) 送信メッセージを送ります。
- (3) ストリームを結合し、いずれかのストリームが終了すると完了する
Mono<Void>を返します。
3.2.3. DataBuffer
DataBufferはWebFluxにおけるバイトバッファの表現です。Spring Core部分では、Data Buffers and Codecsセクションで詳しく説明されています。理解すべき重要な点は、Nettyなど一部のサーバーでは、バイトバッファがプールされ参照カウントされており、メモリリークを避けるため、消費された後は必ず解放しなければならないことです。
Nettyで実行する場合、アプリケーションが入力データバッファを解放されないよう保持する場合は、DataBufferUtils.retain(dataBuffer)を使用し、バッファが消費されたときにDataBufferUtils.release(dataBuffer)を使用する必要があります。
3.2.4. ハンドシェイク(Handshake)
WebSocketHandlerAdapterはWebSocketServiceへ委譲します。デフォルトでは、これはHandshakeWebSocketServiceのインスタンスであり、WebSocketリクエストに対して基本的なチェックを行い、使用中のサーバーにRequestUpgradeStrategyを使用します。現在、Reactor Netty、Tomcat、Jetty、Undertowのサポートが含まれています。
HandshakeWebSocketServiceは、Predicate<String>を設定し、WebSessionから属性を抽出してWebSocketSessionの属性へ挿入できるsessionAttributePredicateプロパティを提供します。
3.2.5. サーバー構成
各サーバーのRequestUpgradeStrategyは、基盤となるWebSocketサーバーエンジンに固有の設定を提供します。WebFlux Java構成を使用する場合は、WebFlux構成の該当セクションに示されている同じプロパティを指定できます。そうでない場合、WebFlux構成を使用しない場合は次のようにします。
Java
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
Kotlin
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
サーバーアップグレード戦略を確認して、使用可能なオプションを確認してください。現在、TomcatとJettyのみがこれらのオプションを公開しています。
3.2.6. CORS
CORSを設定し、WebSocketエンドポイントへのアクセスを制限する最も簡単な方法は、WebSocketHandlerにCorsConfigurationSourceを実装し、許可されたオリジン、ヘッダー、その他の詳細を含むCorsConfigurationを返すことです。それができない場合は、SimpleUrlHandlerのcorsConfigurationsプロパティを設定して、URLパターンごとにCORS設定を指定することもできます。両方が指定されている場合は、CorsConfigurationのcombineメソッドを使用して結合されます。
3.2.7. クライアント
Spring WebFluxは、Reactor Netty、Tomcat、Jetty、Undertow、標準Java、つまりJSR-356の実装を提供するWebSocketClient抽象化を提供します。
Tomcatクライアントは実質的に、
WebSocketSession処理にいくつかの追加機能を持つ標準Javaの拡張であり、Tomcat固有のAPIを使用してバックプレッシャーのためにメッセージ受信を一時停止します。
WebSocketセッションを開始するには、クライアントのインスタンスを作成してexecuteメソッドを使用できます。
Java
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
Kotlin
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
Jettyなど一部のクライアントはLifecycleを実装しており、使用前に停止および開始する必要があります。すべてのクライアントには、基盤となるWebSocketクライアント構成のためのコンストラクターオプションがあります。