3.2. WebSocket API

편집일시: 2021-04-12 14:13 조회수: 277 댓글수: 0
[서블릿 스택과 같은](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server) Spring Framework는 WebSocket 메시지를 처리하는 클라이언트 측 및 서버 측 응용 프로그램을 만드는데 사용할 수 있는 WebSocket API를 제공한다. ## 3.2.1. 서버 [서블릿 스택과 같은](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server) WebSocket 서버를 만들려면, 먼저 `WebSocketHandler`를 만들 수 있다. 다음의 예는 그 방법을 보여준다. Java ``` import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketSession; public class MyWebSocketHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { // ... } } ``` Kotlin ``` import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketSession class MyWebSocketHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { // ... } } ``` 다음은 그것을 URL로 매핑 할 수 있다. Java ``` @Configuration class WebConfig { @Bean public HandlerMapping handlerMapping() { Map<String, WebSocketHandler> map = new HashMap<>(); map.put("/path", new MyWebSocketHandler()); int order = -1; // before annotated controllers return new SimpleUrlHandlerMapping(map, order); } } ``` Kotlin ``` @Configuration class WebConfig { @Bean fun handlerMapping(): HandlerMapping { val map = mapOf("/path" to MyWebSocketHandler()) val order = -1 // before annotated controllers return SimpleUrlHandlerMapping(map, order) } } ``` [WebFlux 구성](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-config)을 사용하는 경우, 그 이상으로 아무것도 할 필요가 없다. 그렇지 않은 경우는 WebFlux 구성을 사용하지 않는 경우는 다음과 같이 `WebSocketHandlerAdapter` 선언해야 한다. Java ``` @Configuration class WebConfig { // ... @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } } ``` Kotlin ``` @Configuration class WebConfig { // ... @Bean fun handlerAdapter() = WebSocketHandlerAdapter() } ``` ## 3.2.2. `WebSocketHandler` `WebSocketHandler`의 `handle` 메소드는 `WebSocketSession`을 받아 `Mono<Void>`를 반환하고 세션의 응용 프로그램 처리가 완료되었음을 나타낸다. 세션은 받은 메시지용과 보내는 메시지용의 2개의 스트림을 통해 처리된다. 다음 표에서는 스트림을 처리하는 두 가지 방법을 보여준다. | WebSocketSession 멧소드 | 설명 | |--|--| | `Flux<WebSocketMessage> receive()` | 인바운드 메시지 스트림에 대한 액세스를 제공하고 연결이 닫힐 때 완료된다. | | `Mono<Void> send(Publisher<WebSocketMessage>)` | 아웃바운드 메시지의 소스를 검색하고 메시지를 작성하고 소스가 완료되어 쓰기가 완료되면 완료되는 `Mono<Void>` 를 반환한다. | `WebSocketHandler`는 수신 스트림 전송 스트림을 통합 흐름에 구성하고, 그 흐름의 보완을 반영하기 `Mono<Void>`를 반환해야 한다. 응용 프로그램 요구 사항에 따라 통합 흐름은 다음의 경우에 완료된다. - 수신 또는 송신 메시지 스트림이 완료되었을 때. - 전송 스트림은 무한하고, 아웃바운드 스트림이 완료되었을 때(즉, 연결이 닫혔을 때). - `WebSocketSession`의 `close` 메소드를 호출했을 때. 인바운드, 아웃바운드 메시지 스트림이 함께 구성된 경우, Reactive Streams는 종료 활동을 알리기 위해 연결이 열려 있는지 여부를 확인할 필요는 없다. 인바운드 스트림은 완료 또는 오류 신호를 수신하고, 아웃바운드 스트림은 취소 신호를 수신한다. 핸들러의 가장 기본적인 구현은 인바운드 스트림을 처리하는 것이다. 다음의 예는 이러한 구현을 보여준다. Java ``` class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { return session.receive() // (1) .doOnNext(message -> { // ... // (2) }) .concatMap(message -> { // ... // (3) }) .then(); // (4) } } ``` Kotlin ``` class ExampleHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { return session.receive() // (1) .doOnNext { // ... // (2) } .concatMap { // ... // (3) } .then() // (4) } } ``` - (1) 인바운드 메시지 스트림에 액세스한다. - (2) 각 메시지에 뭔가를 한다. - (3) 메시지 내용을 사용하는 중첩된 비동기 작업을 수행한다. - (4) 수신이 종료되면 완료되는 `Mono<Void>` 를 반환한다. > 중첩된 비동기 연산의 경우, 풀링된 데이터 버퍼를 사용하는 기본 서버(예를 들어, Netty)에서 `message.retain()` 호출해야 한다. 그렇지 않으면 데이터를 읽기 전에 데이터 버퍼가 해제될 수 있다. 배경에 대해서는 [데이터 버퍼 및 코덱](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#databuffers)을 참조해라. 다음의 구현은 수신 스트림와 송신 스트림을 결합한다. Java ``` class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { Flux<WebSocketMessage> output = session.receive() // (1) .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) .map(value -> session.textMessage("Echo " + value)); // (2) return session.send(output); // (3) } } ``` Kotlin ``` class ExampleHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { val output = session.receive() // (1) .doOnNext { // ... } .concatMap { // ... } .map { session.textMessage("Echo $it") } // (2) return session.send(output) // (3) } } ``` - (1) 받은 메시지 스트림을 처리한다. - (2) 보내는 메시지를 작성하고, 결합된 흐름을 만든다. - (3) 메세지를 받는 동안은 완료하지 않는 `Mono<Void>`를 반환한다. 다음의 예와 같이, 수신 스트림와 전송 스트림은 독립적으로 처리하고 완료돼었을 때만 결합 할 수 있다. Java ``` class ExampleHandler implements WebSocketHandler { @Override public Mono<Void> handle(WebSocketSession session) { Mono<Void> input = session.receive() // (1) .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) .then(); Flux<String> source = ... ; Mono<Void> output = session.send(source.map(session::textMessage)); // (2) return Mono.zip(input, output).then(); // (3) } } ``` Kotlin ``` class ExampleHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { val input = session.receive() // (1) .doOnNext { // ... } .concatMap { // ... } .then() val source: Flux<String> = ... val output = session.send(source.map(session::textMessage)) // (2) return Mono.zip(input, output).then() // (3) } } ``` - (1) 수신 메시지 스트림을 처리한다. - (2) 보내는 메시지를 보냅니다. - (3) 스트림을 합쳐서 어느 하나의 스트림이 종료하면 완료되는 `Mono<Void>` 를 반환한다. ## 3.2.3. DataBuffer `DataBuffer`는 WebFlux 바이트 버퍼 표현이다. Spring 코어 부분은 [데이터 버퍼 및 코덱](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#databuffers) 섹션에서 자세히 설명하고 있다. 이해해야 할 중요한 점은 Netty와 같은 일부 서버에서는 바이트 버퍼가 풀링 된 참조 계산 메모리 누수를 피하기 위해 소비가 된 이후에는 반드시 릴리즈해야 한다. Netty에서 실행하는 경우 응용 프로그램이 해제되지 않도록 입력 데이터 버퍼를 유지하는 경우는 `DataBufferUtils.retain(dataBuffer)`을 사용하고 버퍼가 소비 될 때에 `DataBufferUtils.release(dataBuffer)`을 사용해야 한다. ## 3.2.4. 핸드셰이크(Handshake) [서블릿 스택과 같은](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server-handshake) `WebSocketHandlerAdapter`는 `WebSocketService`에 위임한다. 기본적으로 이는 `HandshakeWebSocketService`의 인스턴스이며, WebSocket 요청에 대해 기본적인 검사를 수행하고 사용중인 서버에 `RequestUpgradeStrategy`을 사용한다. 현재 Reactor Netty, Tomcat, Jetty, Undertow의 지원이 포함되어 있다. `HandshakeWebSocketService`는 `Predicate<String>`를 설정하고 `WebSession` 에서 속성을 추출하여 `WebSocketSession`의 속성에 삽입할 수 있는 `sessionAttributePredicate` 속성을 제공한다. ## 3.2.5. 서버 구성 [서블릿 스택과 같은](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server-runtime-configuration) 각 서버의 `RequestUpgradeStrategy`는 기본이 되는 WebSocket 서버 엔진에 특정 설정을 제공한다. WebFlux Java 구성을 사용하는 경우는 [WebFlux 구성](https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-config-websocket-service)의 해당 섹션에 나와있는 같은 속성을 지정할 수 있다. 그렇지 않은 경우는 WebFlux 구성을 사용하지 않으려면 다음과 같이 사용한다. Java ``` @Configuration class WebConfig { @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(webSocketService()); } @Bean public WebSocketService webSocketService() { TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy(); strategy.setMaxSessionIdleTimeout(0L); return new HandshakeWebSocketService(strategy); } } ``` Kotlin ``` @Configuration class WebConfig { @Bean fun handlerAdapter() = WebSocketHandlerAdapter(webSocketService()) @Bean fun webSocketService(): WebSocketService { val strategy = TomcatRequestUpgradeStrategy().apply { setMaxSessionIdleTimeout(0L) } return HandshakeWebSocketService(strategy) } } ``` 서버 업그레이드 전략을 확인하여 사용 가능한 옵션을 확인한다. 현재 Tomcat과 Jetty만 이러한 옵션을 공개하고 있다. ## 3.2.6. CORS [서블릿 스택과 같은](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html#websocket-server-allowed-origins) CORS를 설정하고 WebSocket 엔드포인트에 대한 액세스를 제한하는 가장 쉬운 방법은 `WebSocketHandler`에 `CorsConfigurationSource`를 구현하고 허용된 발신원, 헤더, 그외 세부 사항을 포함하는 `CorsConfiguration`을 반환한다. 그럴 수 없는 경우는 `SimpleUrlHandler`의 `corsConfigurations` 속성을 설정하여 URL 패턴에 CORS 설정을 지정할 수도 있다. 모두가 지정되어 있는 경우는 `CorsConfiguration`의 `combine` 메소드를 사용하여 결합된다. ## 3.2.7. 클라이언트 Spring WebFlux은 Reactor Netty, Tomcat, Jetty, Undertow, 표준 Java (즉, JSR-356)의 구현을 제공하는 `WebSocketClient` 추상화를 제공한다. > Tomcat 클라이언트는 사실상, `WebSocketSession` 처리에 몇 가지 추가 기능을 갖춘 표준 Java의 확장이며, Tomcat 고유의 API를 이용하여 역 배압 메시지의 수신을 일시 중지한다. WebSocket 세션을 시작하려면 클라이언트의 인스턴스를 만들어서 `execute` 메소드를 사용할 수 있다. Java ``` WebSocketClient client = new ReactorNettyWebSocketClient(); URI url = new URI("ws://localhost:8080/path"); client.execute(url, session -> session.receive() .doOnNext(System.out::println) .then()); ``` Kotlin ``` val client = ReactorNettyWebSocketClient() val url = URI("ws://localhost:8080/path") client.execute(url) { session -> session.receive() .doOnNext(::println) .then() } ``` Jetty와 같은 일부 클라이언트는 `Lifecycle` 를 구현하고, 사용하기 전에 중지 및 시작해야 한다. 모든 클라이언트에는 기본되는 `WebSocket` 클라이언트 구성에 대한 생성자 옵션이 있다.

이전 글 : 3.1. WebSocket 개요
다음 글 : 4. Testing