5.2. RSocketRequester

편집일시: 2021-04-12 14:18 조회수: 178 댓글수: 0
`RSocketRequester`는 RSocket 요청을 실행하기 위한 능숙한 API를 제공하고, 낮은 레벨의 데이터 버퍼가 아닌 데이터와 메타 데이터 객체를 받아들이고 반환한다. 대칭적으로 사용하여 클라이언트에서 요청을 만들거나 서버에서 요청을 만들 수 있다. ## 5.2.1. 클라이언트 요청자(Client Requester) 클라이언트에서 `RSocketRequester`을 받으려면 서버에 연결한다. 여기에는 연결 설정을 포함한 RSocket `SETUP` 프레임의 전송이 포함된다. `RSocketRequester`는 SETUP 프레임의 연결 설정을 포함한 `io.rsocket.core.RSocketConnector`의 준비를 위한 빌더를 제공한다. 이는 디폴트로 연결하는 가장 기본적인 방법이다. Java ``` RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000); URI url = URI.create("https://example.org:8080/rsocket"); RSocketRequester requester = RSocketRequester.builder().webSocket(url); ``` Kotlin ``` val requester = RSocketRequester.builder().tcp("localhost", 7000) URI url = URI.create("https://example.org:8080/rsocket"); val requester = RSocketRequester.builder().webSocket(url) ``` 위는 즉시 연결되지 않는다. 요청이 이루어지면 공유 연결이 투명하게 확립되어 사용된다. ### 연결 설정 `RSocketRequester.Builder` 는 초기 `SETUP` 프레임을 정의하기 위해 다음을 제공한다. - `dataMimeType(MimeType)` - 연결 상의 데이터의 MIME 타입을 설정한다. - `metadataMimeType(MimeType)` - 연결의 메타 데이터의 MIME 타입을 설정한다. - `setupData(Object)` - `SETUP`에 포함할 데이터. - `setupRoute(String, Object…)` - `SETUP`에 포함할 메타 데이터의 루트. - `setupMetadata(Object, MimeType)` - `SETUP`에 포함하는 다른 메타 데이터. 데이터의 경우는 기본 MIME 타입은 처음 구성된 Decoder에서 파생한다. 메타 데이터의 경우 기본 MIME 유형은 [복합 메타 데이터(composite metadata)](https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md)이며, 요청마다 여러 메타 데이터 값과 MIME 타입의 페어를 허가한다. 일반적으로 모두를 변경할 필요가 없다. SETUP 프레임의 데이터와 메타 데이터는 옵션이다. 서버 측에서[ `@ConnectMapping`](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#rsocket-annot-connectmapping) 메소드를 사용하여 연결 시작과 `SETUP` 프레임의 콘텐츠를 처리 할 수 있다. 메타 데이터는 연결 수준의 보안에 사용할 수 있다. ### 전략(Strategies) `RSocketRequester.Builder`는 `RSocketStrategies`를 수락하여 요청자를 구성한다. 이를 사용하여 데이터와 메타 데이터 값 (de) -serialization 용 인코더 및 디코더를 제공해야 한다. 기본적으로 `String`, `byte[]`, `ByteBuffer`용 `spring-core`의 기본 코덱만 등록된다. `spring-web`를 추가하면 다음과 같이 등록 할 수있는 기타 기능에 액세스 할 수 있다. Java ``` RSocketStrategies strategies = RSocketStrategies.builder() .encoders(encoders -> encoders.add(new Jackson2CborEncoder())) .decoders(decoders -> decoders.add(new Jackson2CborDecoder())) .build(); RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) .tcp("localhost", 7000); ``` Kotlin ``` val strategies = RSocketStrategies.builder() .encoders { it.add(Jackson2CborEncoder()) } .decoders { it.add(Jackson2CborDecoder()) } .build() val requester = RSocketRequester.builder() .rsocketStrategies(strategies) .tcp("localhost", 7000) ``` `RSocketStrategies`는 재사용을 위해 설계되어 있다. 일부 시나리오에서는 예를 들어 동일한 응용 프로그램의 클라이언트와 서버의 경우, Spring 설정으로 선언하는 것이 좋다. ### 클라이언트 응답자(Client Responders) `RSocketRequester.Builder`를 사용하여 서버에서 요청에 대한 응답자를 구성 할 수 있다. 서버에서 사용되는 것과 동일한 인프라를 기반으로 클라이언트의 응답과 어노테이션 처리기를 사용할 수 있지만, 다음과 같이 프로그램에 등록한다. Java ``` RSocketStrategies strategies = RSocketStrategies.builder() .routeMatcher(new PathPatternRouteMatcher()) // (1) .build(); SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2) RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(responder)) // (3) .tcp("localhost", 7000); ``` Kotlin ``` val strategies = RSocketStrategies.builder() .routeMatcher(PathPatternRouteMatcher()) // (1) .build() val responder = RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2) val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor(responder) } // (3) .tcp("localhost", 7000) ``` - (1) `spring-web` 가 존재하는 경우, 효율적인 경로 매칭을 위해 `PathPatternRouteMatcher`를 사용한다. - (2) `@MessageMaping` 그리고/또는 `@ConnectMapping` 메소드를 사용하여 클래스에서 응답자을 만든다. - (3) 응답자을 등록한다. 상기 클라이언트 응답자의 프로그램에 의한 등록을 위해 설계된 도구에 지나지 않는다는 점에 유의해라. 클라이언트 응답자가 Spring 설정의 대체 시나리오의 경우 `RSocketMessageHandler`를 Spring Bean으로 선언하고 다음과 같이 적용 할 수 있다. Java ``` ApplicationContext context = ... ; RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(handler.responder())) .tcp("localhost", 7000); ``` Kotlin ``` import org.springframework.beans.factory.getBean val context: ApplicationContext = ... val handler = context.getBean<RSocketMessageHandler>() val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor(handler.responder()) } .tcp("localhost", 7000) ``` 위의 경우 `RSocketMessageHandler`에서 `setHandlerPredicate`를 사용하여 클라이언트 응답자를 검출하기 위한 다른 전략으로 전환해야 하는 경우도 있다. `@RSocketClientResponder` 등의 커스텀 어노테이션과 기본의 `@Controller`을 기반으로한다. 이것은 클라이언트와 서버 또는 동일한 응용 프로그램에서 여러 클라이언트를 사용하는 시나리오에 필요한다. 프로그래밍 모델에 대한 자세한 내용은 [어노테이션이 선언된 응답자](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#rsocket-annot-responders)를 참조해라. ### 확장(Advanced) `RSocketRequesterBuilder`는 기본이 되는`io.rsocket.core.RSocketConnector`를 공개하는 콜백을 제공하고, keep-alive 간격(intervals), 세션 재개, 인터셉터 등의 고급 옵션을 제공한다. 다음과 같이 그 레벨에 옵션을 구성 할 수 있다. Java ``` RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> { // ... }) .tcp("localhost", 7000); ``` Kotlin ``` val requester = RSocketRequester.builder() .rsocketConnector { //... } .tcp("localhost", 7000) ``` ## 5.2.2 서버 요청자 서버에서 연결된 클라이언트에 요청을 할 서버에서 연결된 클라이언트 요청자를 얻을 수 있다. [어노테이션이 선언된 응답자](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#rsocket-annot-responders)는 `@ConnectMapping`와 `@MessageMapping` 메소드는 `RSocketRequester` 인수를 지원한다. 이것을 사용하여 연결 요청자에 액세스한다. `@ConnectMapping` 메서드는 본질적으로 `SETUP` 프레임 핸들러이며 요청을 시작하기 전에 처리해야 하는 점에 유의해라. 따라서 첫번째 요청은 처리에서 분리해야 한다. 예를 들면 아래와 같다. Java ``` @ConnectMapping Mono<Void> handle(RSocketRequester requester) { requester.route("status").data("5") .retrieveFlux(StatusReport.class) .subscribe(bar -> { // (1) // ... }); return ... // (2) } ``` - (1) 처리와 관계없이 요청을 비동기적으로 시작한다. - (2) 처리를 실행하고, 완료되면 `Mono<Void>`를 반환한다. Kotlin ``` @ConnectMapping suspend fun handle(requester: RSocketRequester) { GlobalScope.launch { requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // (1) // ... } } /// ... // (2) } ``` - (1) 처리와 관계없이 요청을 비동기적으로 시작한다. - (2) 일시 중지 기능으로 처리한다. ## 5.2.3. Requests [클라이언트](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#rsocket-requester-client) 또는 [서버](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#rsocket-requester-server)의 요청자를 얻으면 다음과 같이 요청을 만들 수 있다. Java ``` ViewBox viewBox = ... ; Flux<AirportLocation> locations = requester.route("locate.radars.within") // (1) .data(viewBox) // (2) .retrieveFlux(AirportLocation.class); // (3) ``` Kotlin ``` val viewBox: ViewBox = ... val locations = requester.route("locate.radars.within") // (1) .data(viewBox) // (2) .retrieveFlow<AirportLocation>() // (3) ``` - (1) 요청 메시지의 메타 데이터에 포함 경로를 지정한다. - (2) 요청 메시지의 데이터를 제공한다. - (3) 예상되는 응답을 선언한다. 상호 작용 유형은 입력과 출력의 카디널리티에서 암묵적으로 결정된다. 위의 예는 하나의 값이 전송된 값의 스트림이 수신되기 때문에 `Request-Stream` 이다. 대부분의 경우, 입력과 출력의 선택이 RSocket 상호 작용의 유형과 응답자가 기대하는 입력과 출력의 종류와 일치하는 한 이에 대하여 생각할 필요가 없다. 잘못된 조합의 유일한 예는 "다대일"이다. `data(Object)` 메소드는 `Flux`와 `Mono` 포함하는 Reactive Streams `Publisher`와 `ReactiveAdapterRegistry`에 등록되어 있는 값의 다른 프로듀서도 받아 들이다. 동일한 유형의 값을 생성하는 `Flux` 등의 여러 값 `Publisher`의 경우 오버로드된 data 메서드 중 하나를 사용하여 모든 요소 유형 검사와 `Encoder` 조회를 회피하는 것을 고려해라. ``` data(Object producer, Class<?> elementClass); data(Object producer, ParameterizedTypeReference<?> elementTypeRef); ``` `data(Object)` 단계는 선택 사항이다. 데이터를 전송하지 요청의 경우는 생략한다. Java ``` Mono<AirportLocation> location = requester.route("find.radar.EWR")) .retrieveMono(AirportLocation.class); ``` Kotlin ``` import org.springframework.messaging.rsocket.retrieveAndAwait val location = requester.route("find.radar.EWR") .retrieveAndAwait<AirportLocation>() ``` [복합 메타 데이터(composite metadata)](https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md) (디폴트)을 사용하여 값이 등록된 `Encoder`으로 지원되는 경우, 추가로 메타 데이터 값을 추가 할 수 있다. 예를 들면 다음과 같다. Java ``` String securityToken = ... ; ViewBox viewBox = ... ; MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); Flux<AirportLocation> locations = requester.route("locate.radars.within") .metadata(securityToken, mimeType) .data(viewBox) .retrieveFlux(AirportLocation.class); ``` Kotlin ``` import org.springframework.messaging.rsocket.retrieveFlow val requester: RSocketRequester = ... val securityToken: String = ... val viewBox: ViewBox = ... val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0") val locations = requester.route("locate.radars.within") .metadata(securityToken, mimeType) .data(viewBox) .retrieveFlow<AirportLocation>() ``` `Fire-and-Forget`의 경우 `Mono<Void>`를 반환하는 `send()` 메소드를 사용한다. `Mono`는 메시지가 성공적으로 전송된 것만을 보여주고 처리 된 것을 나타내는 것은 아니다는 것에 유의해라. `Metadata-Push`의 경우 `Mono<Void>`을 반환 값으로 지정하고 `sendMetadata()` 메소드를 사용한다.

이전 글 : 5.1. 개요