Spring WebFluxの簡単な使い方

概要

ここでは、Spring WebFluxの簡単な使い方を紹介します。

Spring WebFluxは、Spring 5で追加された、ノンブロッキング(Non-Blocking)ランタイム上でリアクティブプログラミングを行える新しいWebフレームワークです。

Reactive web application stack overview

上の図からわかるように、これまでのSpring MVCコンポーネントもよく表現されています。Spring MVCはサーブレットコンテナ上でServlet APIを基盤とするフレームワークでしたが、Spring WebFluxはServlet APIを使用せず、Reactive Streamsとその実装であるReactorを基盤とする新しいHTTP APIで実装されています。ランタイムとしては、NettyやUndertow(サーブレットコンテナではない方向)などのWASをノンブロッキングで使用できます。また、Servlet 3.1で導入されたノンブロッキングAPIを使用するTomcatやJettyの実装も用意されています。

Spring WebFluxのプログラミングモデルとしては、次の2つのパターンが提供されています。

  • @Controlller
  • Router Functions

@Controlllerは、これまでSpring MVCで使ってきたアノテーションベースのController実装方法そのものです。つまり、ランタイムは変わりますが、Controllerのプログラミング方法は同じです。Router Functionsは、ラムダベースの新しいController実装方法です。Node.jsのExpressのようなものだと考えるとよいでしょう。

本文書では、この2つのパターンについて簡単に説明します。

プロジェクトの作成

まずプロジェクトを作成します。Spring Boot 2.0からSpring 5がサポートされています。

curlコマンドで簡単にプロジェクトを作成できます(Windowsの場合はBashで実行します)。

curl https://start.spring.io/starter.tgz  \
       -d bootVersion=2.4.4 \
       -d dependencies=webflux \
       -d baseDir=spring-webflux \
       -d artifactId=webflux \
       -d packageName=com.devkuma.webflux \
       -d applicationName=HelloWebFluxApplication \
       -d type=gradle-project | tar -xzvf -

実行が完了すると、spring-webfluxディレクトリが1つ作成されていることを確認できます。

以降、Streamと書いた場合は連続するデータを指し、Streamと書いた場合はJava 8のjava.util.stream.Streamを指します。

@Controllerモデル

まず、@ControllerモデルでHello Worldを作成してみましょう。

Hello World

src/main/java/com/devkuma/webflux/HelloController.javaを作成します。

package com.devkuma.webflux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }
}

Fluxに馴染みがない人もいるかもしれませんが、一見すると、これまで使ってきたSpring MVCのControllerと同じです。

com.devkuma.webflux.HelloWebFluxApplicationクラスのmainメソッドを実行すると、Spring WebFluxアプリケーションが起動します。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.4)

2021-04-14 07:07:20.558  INFO 20344 --- [           main] c.d.webflux.HelloWebFluxApplication      : Starting HelloWebFluxApplication using Java 11.0.7 on DESKTOP-A67OEI1 with PID 20344 (D:\dev\spring-webflux--tutorial\build\classes\java\main started by kimkc in D:\dev\spring-webflux--tutorial)
2021-04-14 07:07:20.567  INFO 20344 --- [           main] c.d.webflux.HelloWebFluxApplication      : No active profile set, falling back to default profiles: default
2021-04-14 07:07:22.111  INFO 20344 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2021-04-14 07:07:22.122  INFO 20344 --- [           main] c.d.webflux.HelloWebFluxApplication      : Started HelloWebFluxApplication in 2.103 seconds (JVM running for 3.381)

curllocalhost:8080にアクセスしてみましょう。

$ curl -i localhost:8080
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    10    0    10    0     0    909      0 --:--:-- --:--:-- --:--:--  1000HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/plain;charset=UTF-8

HelloWorld

HelloWorldが表示されることを確認できます。

Fluxは、Reactive StreamsのPublisherを実装した、N個の要素を持つストリームを表すReactorクラスです。デフォルトではtext/plainでレスポンスが返されましたが、次の形式でも返すことができます。

  • Server-Sent Event
  • JSON Stream

Server-Sent Eventとして返すために、Acceptヘッダーにtext/event-streamを指定して再度実行してみましょう。

$ curl -i localhost:8080 -H 'Accept: text/event-stream'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    24    0    24    0     0   1846      0 --:--:-- --:--:-- --:--:--  2000HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/event-stream;charset=UTF-8

data:Hello

data:World

JSON Streamとして返すには、Acceptヘッダーにapplication/stream+jsonを指定します。ただし、この場合(文字列ストリーム)は、表示はtext/plainの場合と変わりません。

$ curl -i localhost:8080 -H 'Accept: application/stream+json'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    10    0    10    0     0    714      0 --:--:-- --:--:-- --:--:--   769HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json;charset=UTF-8

HelloWorld

無限Stream

次は、もう少しStreamらしいレスポンスを受け取ってみましょう。
FluxStreamから作成できます。
次に、streamメソッドを作成し、無限Streamを作成して、そのうち10件をFluxに変換して返してみます。

次のようにコードを修正し、com.example.HelloWebFluxApplicationクラスを再度実行します。

package com.devkuma.webflux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1); // Java 8の無限Stream
        return Flux.fromStream(stream.limit(10))
                .map(i -> Collections.singletonMap("value", i));
    }
}

/streamに対する3種類のレスポンスは、それぞれ次のようになります。

通常のJSON

$ curl -i localhost:8080/stream
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   121    0   121    0     0  11000      0 --:--:-- --:--:-- --:--:-- 12100HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json

[{"value":0},{"value":1},{"value":2},{"value":3},{"value":4},{"value":5},{"value":6},{"value":7},{"value":8},{"value":9}]

Server-Sent Event

$ curl -i localhost:8080/stream -H 'Accept: text/event-stream'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   180    0   180    0     0  12000      0 --:--:-- --:--:-- --:--:-- 12000HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/event-stream;charset=UTF-8

data:{"value":0}

data:{"value":1}

data:{"value":2}

data:{"value":3}

data:{"value":4}

data:{"value":5}

data:{"value":6}

data:{"value":7}

data:{"value":8}

data:{"value":9}

JSON Stream

$ curl -i localhost:8080/stream -H 'Accept: application/stream+json'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   120    0   120    0     0   7500      0 --:--:-- --:--:-- --:--:--  7500HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json

{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}

これで、通常のJSON(application/json)とJSON Stream(application/stream + json)の違いが見えるはずです。

実は、ここでlimitを付ける必要はなく、そのまま無限Streamを返すこともできます。通常のControllerでは、レスポンスは永遠に返ってこないでしょう。application/jsonの場合、レスポンスは返りません(Integerがオーバーフローすれば返るかもしれませんが)。そのため、まずはあえてlimitを付けました。Server-Sent EventとJSON Streamでは無限Streamを返すことができます。試してみましょう。

次のコードのようにlimitを除外して実行してみます。

package com.devkuma.webflux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1); // Java 8の無限Stream
        return Flux.fromStream(stream) // Limitを除外
                .map(i -> Collections.singletonMap("value", i));
    }
}

Server-Sent EventもJSON Streamも次のように動作します。Ctrl+Cで止めるまで、非常に速いStream結果を見ることができます。

Server-Sent Event

$ curl -i localhost:8080/stream -H 'Accept: text/event-stream'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/event-stream;charset=UTF-8

data:{"value":0}

data:{"value":1}

data:{"value":2}

data:{"value":3}

data:{"value":4}

data:{"value":5}

data:{"value":6}

data:{"value":7}

data:{"value":8}

data:{"value":9}

data:{"value":10}

data:{"value":11}

data:{"value":12}

data:{"value":13}

... 以下省略 ...

JSON Stream

$ curl -i localhost:8080/stream -H 'Accept: application/stream+json'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json

{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}
{"value":10}
{"value":11}
{"value":12}

Streamをゆっくり返したい場合は、Flux.interval(Duration)とzipを指定してStreamの結果を確認してみましょう。

$ curl -i localhost:8080/stream -H 'Accept: application/stream+json'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
kimkc@DESKTOP-A67OEI1 MINGW64 /d
$ curl -i localhost:8080/stream -H 'Accept: application/stream+json'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    96    0    96    0     0     10      0 --:--:--  0:00:09 --:--:--    11HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/stream+json

{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}

POST

次に、POSTの場合も見てみましょう。

リクエスト本文の文字列を大文字にして返すechoメソッドを作成します。

package com.devkuma.webflux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

@RestController
public class HelloController {

    @GetMapping("/")
    Flux<String> hello() {
        return Flux.just("Hello", "World");
    }

    @GetMapping("/stream")
    Flux<Map<String, Integer>> stream() {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        return Flux.fromStream(stream).zipWith(Flux.interval(Duration.ofSeconds(1)))
                .map(tuple -> Collections.singletonMap("value", tuple.getT1() /* タプルの最初の要素 = Stream<Integer>の要素 */));
    }

    @PostMapping("/echo")
    Mono<String> echo(@RequestBody Mono<String> body) {
        return body.map(String::toUpperCase);
    }
}

通常のControllerと同じように、@RequestBodyでリクエスト本文を受け取ることができます。Spring WebFluxでは、リクエスト本文(ここでは文字列)をMonoで包んで受け取ることで、非同期に処理をchain/composeできます。なお、Monoで包まずStringとして受け取った場合は、ノンブロッキングではなく同期的に処理されます。この場合、リクエスト本文を大文字に変換するmapの結果であるMonoをそのまま返します。Monoは1個または0個の要素を持つPublisherです。

$  curl -i localhost:8080/echo -H 'Content-Type: application/json' -d devkuma
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    14  100     7  100     7   1166   1166 --:--:-- --:--:-- --:--:--  2800HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 7

DEVKUMA

Monoの部分をFluxに置き換えてみましょう。1件だけ処理すればよい場合は、Monoを使うほうが明示的です。逆に、複数件のStreamを処理したい場合はFluxにします。

次の例では、@PostMappingを宣言したstreamメソッドでStreamをFluxとして受け取り、キーがvalueの値を2倍して、その値をキーdoubleに入れたMapへ変換して返します。

$  curl -i localhost:8080/stream -d '{"value":1}{"value":2}{"value":3}' -H 'Content-Type: application/stream+json'  -H 'Accept: text/event-stream'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    90    0    57  100    33   4071   2357 --:--:-- --:--:-- --:--:--  6923HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/event-stream;charset=UTF-8

data:{"double":2}

data:{"double":4}

data:{"double":6}

Router Functionsモデル

次に、もう1つのプログラミングモデルであるRouter Functionsについて見てみましょう。

Router Functionsは、POJO@RestController@GetMappingなどのアノテーションを宣言してルーティングを定義する代わりに、パスとハンドラー関数(ラムダ)の組み合わせでルーティングを定義します。

Spring Boot 2.0では、Router Functionsと@Controllerモデルは共存できないため(両方を使用すると@Controllerが無視されます)、先ほど作成したHelloControllerを削除します。

Hello World

@Controllerの場合と同じように、GET("/")Fluxの「Hello World」を返すRoutingを定義してみましょう。Spring Boot 2.0では、RouterFunction<ServerResponse>のBean定義があると、それはRouter Functionsのルーティング定義と見なされます。

まず、簡単にBean宣言でルーティングを定義してみます。

package com.devkuma.webflux;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return RouterFunctions.route(RequestPredicates.GET("/"), req -> ServerResponse
                .ok().body(Flux.just("Hello", "World!"), String.class));
    }
}

上のように書く方法もありますが、RouterFunctions.*RequestPredicates.*ServerResponse.*をstatic importする方法もあります。IntelliJ IDEAなら、ショートカット(Mac: Option + Enter、Windows: Alt+Enter)で「Add static import for …」を選択します。

static importすると、次のように書くことができます。

package com.devkuma.webflux;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@SpringBootApplication
public class HelloWebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebFluxApplication.class, args);
    }

    @Bean
    RouterFunction<ServerResponse> routes() {
        return route(GET("/"),
                req -> ok().body(Flux.just("Hello", "World!"), String.class));
    }
}

HelloWebFluxApplicationを再起動し、curlでlocalhost:8080にアクセスしてみましょう。

$ curl -i localhost:8080
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    11    0    11    0     0     59      0 --:--:-- --:--:-- --:--:--    59HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/plain;charset=UTF-8

HelloWorld!

Map<String, Integer>のようにGenerics形式で返すには、BodyInserters.fromPublisher(P publisher, ParameterizedTypeReference<T> elementTypeRef)を使用します。

少し面倒に見えるかもしれません。

package com.devkuma.webflux;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello).andRoute(GET("/stream"), this::stream);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_NDJSON)
                .body(fromPublisher(flux, new ParameterizedTypeReference<Map<String, Integer>>(){}));
    }
}

HelloWebFluxApplicationクラスを再起動して/streamにアクセスすると、無限JSON Streamが返されます。

$ curl -i localhost:8080/stream
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/x-ndjson

{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
{"value":5}
{"value":6}
{"value":7}
{"value":8}
{"value":9}
{"value":10}

... 以下省略 ...

POST

POSTの場合は、RequestPredicates.POSTを使用してルーティングを定義し、レスポンス本文にはServerRequest.bodyToMonoまたはServerRequest.bodyToFluxを使用すればよく、これまで説明した内容と特に異なる点はありません。

package com.devkuma.webflux;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello)
                .andRoute(GET("/stream"), this::stream)
                .andRoute(POST("/echo"), this::echo);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_NDJSON)
                .body(fromPublisher(flux, new ParameterizedTypeReference<Map<String, Integer>>(){}));
    }
    
    public Mono<ServerResponse> echo(ServerRequest req) {
        Mono<String> body = req.bodyToMono(String.class).map(String::toUpperCase);
        return ok().body(body, String.class);
    }
}

POST /streamも同様ですが、Request BodyをGenerics形式のPublisherとして受け取る場合は、ServerRequest.bodyToFluxではなくServerRequest.bodyメソッドに、BodyInsertersと反対の概念であるBodyExtractorsを渡します。

package com.devkuma.webflux;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.springframework.web.reactive.function.BodyExtractors.toFlux;
import static org.springframework.web.reactive.function.BodyInserters.fromPublisher;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return route(GET("/"), this::hello)
                .andRoute(GET("/stream"), this::stream)
                .andRoute(POST("/echo"), this::echo)
                .andRoute(POST("/stream"), this::postStream);
    }

    public Mono<ServerResponse> hello(ServerRequest req) {
        return ok().body(Flux.just("Hello", "World!"), String.class);
    }

    public Mono<ServerResponse> stream(ServerRequest req) {
        Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
        Flux<Map<String, Integer>> flux = Flux.fromStream(stream)
                .map(i -> Collections.singletonMap("value", i));
        return ok().contentType(MediaType.APPLICATION_NDJSON)
                .body(fromPublisher(flux, new ParameterizedTypeReference<Map<String, Integer>>(){}));
    }
    
    public Mono<ServerResponse> echo(ServerRequest req) {
        Mono<String> body = req.bodyToMono(String.class).map(String::toUpperCase);
        return ok().body(body, String.class);
    }

    public Mono<ServerResponse> postStream(ServerRequest req) {
        Flux<Map<String, Integer>> body = req.body(toFlux( // BodyExtractors.toFluxをstatic importする必要がある。
                new ParameterizedTypeReference<Map<String, Integer>>(){}));
        
        return ok().contentType(MediaType.TEXT_EVENT_STREAM)
                .body(fromPublisher(body.map(m -> Collections.singletonMap("double", m.get("value") * 2)),
                        new ParameterizedTypeReference<Map<String, Integer>>(){}));
    }
}

これで、HelloControllerと同じ動作をするHelloHandlerクラスが完成しました。

RequestPredicates.GETRequestPredicates.POST@GetMapping@PostMappingに対応し、HTTPメソッドのない@RequestMappingに対応するのはRequestPredicates.pathです。

そして、これらはFunctional Interfaceなので、次のようにラムダ式でリクエストマッチングルールを選択的に書くことができます。