Hello, I am backend Web developer in Tokyo

Anatomy of CompletableFuture

Asynchronous Computation with CompletableFuture

Without utility methods

public Future<String> boilerPlateFuture() throws InterruptedException {
    CompletableFuture<String> completableFuture
            = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello, I am " + Thread.currentThread().getName());
        return null;
    });

    return completableFuture;
}

Supply, Apply and Accept

Future<Void> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    return "Hello";
}).thenApply(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do thenApply");
    return s + " World";
}).thenAccept(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do thenAccept");
    log.info("future result : {}", s);
});

log.info("do main");
10:33:41.1 [main] - do main
10:33:42.1 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
10:33:43.1 [ForkJoinPool.commonPool-worker-1] - do thenApply
10:33:44.1 [ForkJoinPool.commonPool-worker-1] - do thenAccept
10:33:44.1 [ForkJoinPool.commonPool-worker-1] - future result : Hello World

Weaving Futures

Supply and Compose

Future<Void> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    return "Hello";
}).thenCompose(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do thenCompose");
    return CompletableFuture.supplyAsync(() -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
        log.info("do supplyAsync");
        return s + " World";
    }).thenAccept(t -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
        log.info("do thenAccept-chained");
    });
}).thenAccept(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do thenAccept");
    log.info("future result : {}", s);
});

log.info("do main");
10:55:06.6 [main] - do main
10:55:07.6 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
10:55:08.6 [ForkJoinPool.commonPool-worker-1] - do thenCompose
10:55:09.6 [ForkJoinPool.commonPool-worker-2] - do supplyAsync
10:55:10.6 [ForkJoinPool.commonPool-worker-2] - do thenAccept-chained
10:55:11.6 [ForkJoinPool.commonPool-worker-2] - do thenAccept
10:55:11.6 [ForkJoinPool.commonPool-worker-2] - future result : null

The difference between thenApply() and thenCompose() :
- thenApply : Nested future (map) - thenCompose : Chaining independent future (flatMap)

thenCompose() is useful when assembling modularized code :

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> 1.0);

CompletableFuture<CompletableFuture<Double>> nested = future1.thenApply(i -> future2);
CompletableFuture<Double> chained = future1.thenCompose(i -> future2);

Future of thenCompose is executed in sequential as we can check from the result above that the same worker thread has been used (it’s just chaining).
Future of thenCombine works in parallel.

Supply and Combine

Future<Void> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    return "Hello";
}).thenCombine(
        CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("do supplyAsync");
            return " World";
        }),
        (s1, s2) -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("do combining");
            return s1 + s2;
        }
).thenAccept(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do thenAccept");
    log.info("future result : {}", s);
});

log.info("do main");
10:51:32.5 [main] - do main
10:51:33.5 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
10:51:33.5 [ForkJoinPool.commonPool-worker-2] - do supplyAsync
10:51:34.5 [ForkJoinPool.commonPool-worker-2] - do combining
10:51:35.5 [ForkJoinPool.commonPool-worker-2] - do thenAccept
10:51:35.5 [ForkJoinPool.commonPool-worker-2] - future result : Hello World

Accept Both

thenAcceptBoth() is similar to thenCombine().thenAccept()

Future<Void> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    return "Hello";
}).thenAcceptBoth(
        CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("do supplyAsync");
            return " World";
        }),
        (s1, s2) -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
            log.info("do accepting");
            log.info("future result : {}", s1 + s2);
        }
);

log.info("do main");
11:01:16.8 [main] - do main
11:01:17.8 [ForkJoinPool.commonPool-worker-2] - do supplyAsync
11:01:17.8 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
11:01:18.8 [ForkJoinPool.commonPool-worker-2] - do accepting
11:01:18.8 [ForkJoinPool.commonPool-worker-2] - future result : Hello World

Multiple Futures

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    log.info("future1");
    return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    log.info("future2");
    return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    log.info("future3");
    return 3;
});
List<CompletableFuture> listOfFuture = new ArrayList<>();
listOfFuture.add(future1);
listOfFuture.add(future2);
listOfFuture.add(future3);

CompletableFuture.allOf(listOfFuture.toArray(new CompletableFuture[listOfFuture.size()]))
        .thenAccept(s -> {
            log.info("allOf");
            listOfFuture.stream()
                    .map(CompletableFuture::join)
                    .forEach(r -> log.info("result : {}", r));
        }).join();
12:45:08.7 [ForkJoinPool.commonPool-worker-1] - future1
12:45:08.7 [ForkJoinPool.commonPool-worker-3] - future3
12:45:08.7 [ForkJoinPool.commonPool-worker-2] - future2
12:45:08.7 [ForkJoinPool.commonPool-worker-2] - allOf
12:45:08.7 [ForkJoinPool.commonPool-worker-2] - result : 1
12:45:08.7 [ForkJoinPool.commonPool-worker-2] - result : 2
12:45:08.7 [ForkJoinPool.commonPool-worker-2] - result : 3

get() throws checked exception when future completed exceptionally, however join() throws unchecked exception.
Due to this, join() is good to use with stream.

>String combined = Stream.of(future1, future2, future3)
>  .map(CompletableFuture::join)
>  .collect(Collectors.joining(" "));
>```

# Exception Handling
## handle
`handler()` is `thenApply()` + exception handler.  
```java
String name = null;
Future<Void> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    if (name == null) throw new RuntimeException("Computation error!");
    return "Hello, " + name;
}).handle((s, ex) -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do handle");
    return ex != null ? "Hello, Unknown!" : s;
}).thenAccept(s -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do accept");
    log.info("future result : {}", s);
});

log.info("do main");

exceptionally

exceptionally() provides fallback value when the Future has completed exceptionally.

Future<Integer> future = CompletableFuture.supplyAsync(() -> {
    int val = 1;
    log.info("do supplyAsync");
    return val;
}).thenApply(s -> {
    log.info("do thenApply 1");
    if (1 == 1) throw new RuntimeException();
    return s + 1;
}).thenApply(s -> {
    log.info("do thenApply 2");
    return s + 1;
}).exceptionally(ex -> 100);

log.info("result : {}", future.get());
12:28:07.8 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
12:28:07.8 [ForkJoinPool.commonPool-worker-1] - do thenApply 1
12:28:07.8 [main] - result : 100
Future<Integer> future = CompletableFuture.supplyAsync(() -> {
    int val = 1;
    log.info("do supplyAsync");
    return val;
}).thenApply(s -> {
    log.info("do thenApply 1");
    return s + 1;
}).thenApply(s -> {
    log.info("do thenApply 2");
    if (1 == 1) throw new RuntimeException();
    return s + 1;
}).exceptionally(ex -> 100);

log.info("result : {}", future.get());
12:29:00.5 [ForkJoinPool.commonPool-worker-1] - do supplyAsync
12:29:00.5 [ForkJoinPool.commonPool-worker-1] - do thenApply 1
12:29:00.5 [ForkJoinPool.commonPool-worker-1] - do thenApply 2
12:29:00.5 [main] - result : 100

However it does not handle exception fot those which occured after the chaining.

Future<Integer> future = CompletableFuture.supplyAsync(() -> {
    int val = 1;
    log.info("do supplyAsync");
    return val;
}).thenApply(s -> {
    log.info("do thenApply 1");
    return s + 1;
}).exceptionally(ex -> 100)
.thenApply(s -> {
    log.info("do thenApply 2");
    if (1 == 1) throw new RuntimeException();
    return s + 1;
});

log.info("result : {}", future.get());
12:30:01.183 [ForkJoinPool.commonPool-worker-1] INFO rakuten.travel.reservation.steps.StepsApplication - do supplyAsync
12:30:01.187 [ForkJoinPool.commonPool-worker-1] INFO rakuten.travel.reservation.steps.StepsApplication - do thenApply 1
12:30:01.187 [ForkJoinPool.commonPool-worker-1] INFO rakuten.travel.reservation.steps.StepsApplication - do thenApply 2
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException
    at ...
Caused by: java.lang.RuntimeException
    at ...

completeExceptionally

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    log.info("do supplyAsync");
    return "Hello";
});
future.completeExceptionally(new RuntimeException("Failed"));
future.get(); // This line will cause ExecutionException