Spring Web Reactive | 5. RSocket | 5.2. RSocketRequester
RSocketRequester は、RSocketリクエストを実行するための使いやすいAPIを提供する。
低レベルのデータバッファではなく、データオブジェクトとメタデータオブジェクトを受け取り、返す。
クライアントからリクエストを作る場合にも、サーバーからリクエストを作る場合にも、対称的に使用できる。
5.2.1. クライアントRequester
クライアントで RSocketRequester を取得するには、サーバーへ接続する。
これには、接続設定を含むRSocket SETUP フレームの送信が含まれる。
RSocketRequester は、SETUPフレームの接続設定を含めて io.rsocket.core.RSocketConnector を準備するためのビルダーを提供する。
デフォルトで接続する最も基本的な方法は次のとおりである。
Java
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上記のコードはすぐには接続しない。 リクエストが行われると、共有接続が透過的に確立されて使用される。
接続設定
RSocketRequester.Builder は、初期 SETUP フレームを定義するために次の項目を提供する。
dataMimeType(MimeType): 接続上のデータのMIMEタイプを設定する。metadataMimeType(MimeType): 接続上のメタデータのMIMEタイプを設定する。setupData(Object):SETUPに含めるデータ。setupRoute(String, Object...):SETUPに含めるルートメタデータ。setupMetadata(Object, MimeType):SETUPに含める追加メタデータ。
データの場合、デフォルトのMIMEタイプは最初に構成されたDecoderから導出される。 メタデータの場合、デフォルトのMIMEタイプは Composite Metadata であり、リクエストごとに複数のメタデータ値とMIMEタイプのペアを許可する。 通常、これらをすべて変更する必要はない。
SETUPフレームのデータとメタデータは任意である。
サーバー側では、@ConnectMapping メソッドを使用して、接続開始と SETUP フレームの内容を処理できる。
メタデータは接続レベルのセキュリティに使用できる。
Strategies
RSocketRequester.Builder は RSocketStrategies を受け取り、Requesterを構成する。
これを使って、データおよびメタデータ値のシリアライズとデシリアライズに使うエンコーダーとデコーダーを提供する。
デフォルトでは、String、byte[]、ByteBuffer 用の spring-core 基本コーデックのみが登録される。
spring-web を追加すると、次のように他の機能も登録できる。
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies は再利用を前提に設計されている。
同じアプリケーション内のクライアントとサーバーなど、一部のシナリオではSpring設定として宣言するのがよい。
クライアントResponder
RSocketRequester.Builder を使用して、サーバーからのリクエストに対するResponderを構成できる。
サーバーで使用されるものと同じインフラストラクチャをもとに、クライアントでもアノテーション付きハンドラーとResponderを使用できる。 ただし、次のようにプログラムで登録する。
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) // (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) // (3)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) // (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } // (3)
.tcp("localhost", 7000)
- (1)
spring-webが存在する場合、効率的なルートマッチングのためにPathPatternRouteMatcherを使用する。 - (2)
@MessageMappingおよび、または@ConnectMappingメソッドを持つクラスからResponderを作成する。 - (3) Responderを登録する。
上記はクライアントResponderをプログラムで登録するために設計されたユーティリティにすぎない。
クライアントResponderをSpring設定で構成する別のシナリオでは、RSocketMessageHandler をSpring Beanとして宣言し、次のように適用できる。
Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
この場合、RSocketMessageHandler で setHandlerPredicate を使用し、クライアントResponderを検出する別の戦略へ切り替える必要がある場合もある。
たとえば、デフォルトの @Controller を基準にする代わりに、@RSocketClientResponder のようなカスタムアノテーションを使う。
これは、同一アプリケーションでクライアントとサーバーを併用する場合や、複数のクライアントを使用する場合に必要になる。
プログラミングモデルの詳細は、アノテーション付きResponder を参照する。
Advanced
RSocketRequesterBuilder は、基盤となる io.rsocket.core.RSocketConnector を公開するコールバックを提供する。
これにより、keep-alive間隔、セッション再開、インターセプターなどの高度なオプションを扱える。
次のように、そのレベルでオプションを構成できる。
Java
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
5.2.2. サーバーRequester
サーバーでは、接続済みクライアントに対してリクエストを行うために、そのクライアントのRequesterを取得できる。
アノテーション付きResponder では、@ConnectMapping と @MessageMapping メソッドが RSocketRequester 引数をサポートする。
これを使用して接続のRequesterへアクセスする。
@ConnectMapping メソッドは本質的に SETUP フレームハンドラーであり、リクエストを開始する前に処理を完了しなければならない点に注意する。
そのため、最初のリクエストはその処理から切り離す必要がある。
例は次のとおりである。
Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { // (1)
// ...
});
return ... // (2)
}
- (1) 処理とは独立して、リクエストを非同期に開始する。
- (2) 処理を実行し、完了したら
Mono<Void>を返す。
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // (1)
// ...
}
}
/// ... // (2)
}
- (1) 処理とは独立して、リクエストを非同期に開始する。
- (2) suspend関数として処理する。
5.2.3. Requests
クライアント または サーバー のRequesterを取得したら、次のようにリクエストを作成できる。
Java
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlux(AirportLocation.class); // (3)
Kotlin
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlow<AirportLocation>() // (3)
- (1) リクエストメッセージのメタデータに含めるルートを指定する。
- (2) リクエストメッセージのデータを提供する。
- (3) 期待するレスポンスを宣言する。
相互作用タイプは、入力と出力のカーディナリティから暗黙的に決定される。
上記の例は、1つの値を送信し、値のストリームを受信するため Request-Stream である。
ほとんどの場合、入力と出力の選択がRSocketの相互作用タイプおよびResponderが期待する入出力と一致していれば、これを意識する必要はない。
不正な組み合わせの代表例は多対一である。
data(Object) メソッドは、Flux や Mono を含むReactive Streams Publisher と、ReactiveAdapterRegistry に登録された他の値プロデューサーを受け入れる。
同じ型の値を生成する Flux などの複数値 Publisher では、すべての要素で型検査と Encoder 検索を行わないよう、オーバーロードされた data メソッドのいずれかを使うことを検討する。
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) のステップは任意である。
データを送信しないリクエストでは省略する。
Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
デフォルトである Composite Metadata を使用し、値が登録済みの Encoder によってサポートされている場合、追加のメタデータ値を追加できる。
例は次のとおりである。
Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
Fire-and-Forget の場合は、Mono<Void> を返す send() メソッドを使用する。
この Mono はメッセージが正常に送信されたことだけを示し、処理されたことを示すものではない点に注意する。
Metadata-Push の場合は、戻り値として Mono<Void> を指定し、sendMetadata() メソッドを使用する。