Java CompletableFutureの使い方

Javaで複雑なスレッド処理を行えるCompletableFutureの使い方について説明します。

CompletableFuture

Java 8ではCompletableFutureが導入され、より複雑なThread処理を行えるようになりました。

CompletableFutureを使用すると、結果を取得した後にその結果を処理できます。また、複数のCompletableFutureの完了を待って処理を実行したり、いずれかのCompletableFutureが完了するまで待ちながら処理したりできます。

処理の結果として値を返し、それを使って別の処理を行う

CompletableFuture

値を返すのはSupplierで、値を受け取って処理するのはConsumerです。これらを組み合わせてみます。

package com.devkuma.basic.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class SupplyAndConsume {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Supplier<Integer> initValueSupplier = () -> 100;
        Consumer<Integer> valueConsumer = value -> System.out.println(value);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(initValueSupplier)
                                                          .thenAcceptAsync(valueConsumer);

        future.get();  // 結果を取得
    }
}

実行結果:

100
  • CompletableFuture.supplyAsync(Supplier)
    • 非同期でSupplierを処理し、CompletableFutureのインスタンスを返します。
  • CompletableFuture.thenAcceptAsync(Consumer)
    • CompletableFutureインスタンスの処理が終了すると、戻り値を渡してConsumerの処理を実行します。

処理の結果として値を返し、それを変換して返し、その戻り値を処理する

CompletableFuture

値を返すのはSupplierで、変換はFunction、その戻り値を受け取って処理するのはConsumerです。

package com.devkuma.basic.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class SupplyAndExecuteAndConsume {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Supplier<Integer> initValueSupplier = () -> 100;
        Function<Integer, Integer> multiply = value -> value * 2;
        Consumer<Integer> valueConsumer = value -> System.out.println(value);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(initValueSupplier)
                                                          .thenApplyAsync(multiply)
                                                          .thenAcceptAsync(valueConsumer);
        future.get();
    }
}

実行結果:

200
  • 基本的には前のパターンと同じです。
  • CompletableFuture.thenApplyAsync(Function)
    • CompletableFutureの結果値を渡しながらFunctionの処理を実行します。

1つの処理を複数のスレッドで実行し、最初の結果を使って別の処理を行う

CompletableFuture

Supplierで処理した結果を返し、Consumerで受け取った値を使って処理します。

package com.devkuma.basic.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class RaceAndConsume {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Supplier<Integer> initValueSupplier = () -> 100;
        Supplier<Integer> anotherValueSupplier = () -> 200;
        Consumer<Integer> valueConsumer = value -> System.out.println(value);

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(initValueSupplier);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(anotherValueSupplier);

        future1.acceptEitherAsync(future2, valueConsumer).get();
    }
}

実行結果:

100 or 200
  • CompletableFuture.acceptEitherAsync(CompletableFuture, Consumer)
    • 先に結果が出たものを使ってConsumer処理を行います。

1つの処理を複数のスレッドで実行し、最初の結果を使って別の処理を行い、その結果を使ってさらに別の処理を行う

CompletableFuture

package com.devkuma.basic.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class RaceAndConsume2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Supplier<Integer> initValueSupplier = () -> 100;
        Supplier<Integer> anotherValueSupplier = () -> 200;
        Function<Integer, Integer> multiply = value -> value * 2;
        Consumer<Integer> valueConsumer = value -> System.out.println(value);

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(initValueSupplier);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(anotherValueSupplier);

        future1.applyToEitherAsync(future2, multiply)
               .thenAcceptAsync(valueConsumer)
               .get();
    }
}

実行結果:

200 or 400