Mittwoch, Oktober 31, 2018

CompletableFuture II

Das CompletableFuture wurde mit Java 8 eingeführt und ist ein Teil des Packages java.util.concurrent. Ziel: Verkettung von asynchronen Aufgaben. Als Einstiegspunkt dienen die Methoden #get(...) und #complete(...). #get(...) stellt den aktiven Thread wartend und setzt die Ausführung erst dann fort, wenn ein paralleler Thread die Methode #complete(...) ausführt. Folgendes Beispiel zeigt, wie man es nicht umsetzen sollte. Das CompletableFuture wird mit #get() befragt. Da es keinen parallelen Thread gibt, der #complete() aufrufen könnte, wird der Thread ewig warten, wenn wir #get() keinen Timeout Wert mitgeben.

@Test
public void dontDoThat() {
    CompletableFuture<String> cf = new CompletableFuture<>();
    try {
        // cf.get() would wait forever! Don´t do that!
        cf.get(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | ExecutionException
            | TimeoutException ex) {

        assertThat(ex).isNotNull();
    }
}

Erfolgreiche asynchrone Verarbeitung

Das erste Beispiel zeigt eine typische (sehr einfache) asynchrone Verarbeitung. Die asynchrone Verarbeitung wird in der Methode #calculateAsync() (siehe [1]) gestartet. CompletableFuture#get() (siehe [2]) wartet auf die Fertigstellung der Berechnung und liefert das berechnete Ergebnis.

In [3] wird ein CompletableFuture direkt mit dem berechneten Ergebnis erzeugt.

/**
 * Startet einen Thread und übergibt als Synchronisationspunkt ein
 * CompletableFuture. Der Thread meldet über CompletableFuture#complete(...)
 * die fertige Berechnung.
 */
private CompletableFuture<String> calculateAsync()
        throws InterruptedException {

    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

private CompletableFuture<String> calculateAsync2() {
    return CompletableFuture.completedFuture("Hello2");
}

@Test
public void completableFuture() throws Exception {
    /* (1) */
    CompletableFuture<String> future = calculateAsync();

    /* (2) */
    String result = future.get();
    assertThat(result).isEqualTo("Hello");
    assertThat(future.isCompletedExceptionally()).isFalse();

    /* (3) */
    assertThat(calculateAsync2().get()).isEqualTo("Hello2");
}

Abgebrochene asynchrone Verarbeitung

In dem nächsten Beispiel wird die Verarbeitung über CompletableFuture#cancel(boolean) abgebrochen. Am Synchronisationspunkt #get() erhält der Aufrufer eine CancellationException. Der Parameter, den man #cancel(boolean) übergibt, hat keine Auswirkungen. Der Interrupt-Mechanismus wird für die Ablaufsteuerung von CompletabeleFutures nicht verwendet.

private CompletableFuture<String> calculateAsyncWithCancellation()
        throws InterruptedException {

    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        // Die Verarbeitung wird abgebrochen. Der Aufrufer erhaelt eine
        // CancellationException.
        completableFuture.cancel(false);
        return null;
    });

    return completableFuture;
}

@Test
public void cancelCompletableFuture()
        throws InterruptedException, ExecutionException {

    CompletableFuture<String> future = calculateAsyncWithCancellation();

    try {
        future.get(); // CancellationException
        fail("Expected CancellationException");
    } catch (CancellationException ex) {
        assertThat(future.isCancelled()).isTrue();
        assertThat(future.isCompletedExceptionally()).isTrue();
    }
}

Utility Methoden

Neben den oben genannten Möglichkeiten bietet CompletableFuture einen ganzen Blumenstrauß an weiteren hilfreichen Methoden. Z.B. #completeAsync(Supplier) nimmt eine Funktion entgegen, die asynchron mit Hilfe des ForkJoinPool#commonPool() verarbeitet wird.

@Test
public void completeAsyncOfCompletableFuture() throws Exception {
    CompletableFuture<String> cf = new CompletableFuture<>();
    cf.completeAsync(() -> { return "Hallo"; });
    assertThat(cf.get()).isEqualTo("Hallo");
}

Mit #runAsync(Runnable) kann eine parameterlose Funktion übergeben werden. Ein Rückgabeparameter entfällt ebenfalls. In dem Beispiel verwende ich eine Variable, die außerhalb der Funktion liegt. Im Sinne der funktionalen Programmierung ist das nicht optimal. In diesem Fall wöre #completeAsync() die bessere Wahl.

@Test
public void runAsyncOfCompletableFuture() throws Exception {
    final StringBuilder sb = new StringBuilder();
    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
        sb.append("Start calculation ...");
    });

    cf.get();
    assertThat(sb.toString()).isEqualTo("Start calculation ...");
}

Mehrfach Synchronisierung

Um verschiedene nebenläufige Tasks zu einem gemeinsamen Synchronisationspunkt zu bringen, hat CompletableFuture eine Lösung zur Verfügung. In dem Beispiel werden drei CompletableFutures instanziert und per #allOff() zu einem neuen CompletableFuture zusammen gefasst. Der Aufruf von #get() wartete so lange, bis alle drei CompletableFutures ein Ergebnis liefern.

@Test
public void completableFutureWithMultipleParallelTasks() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Hello");
    CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Beautiful");
    CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "World");

    CompletableFuture<Void> combinedFuture = CompletableFuture
            .allOf(future1, future2, future3);

    combinedFuture.get();

    assertThat(future1.isDone()).isTrue();
    assertThat(future2.isDone()).isTrue();
    assertThat(future3.isDone()).isTrue();
}

Referenzen

Freitag, Oktober 12, 2018

Links der Woche

Dieses mal gibt es nur drei Links: Adam 55th Airhacks, Joshua Bloch über sein Buch 'Effective Java' und Javascript Pro Tips. UPDATE: Also das ist mittlerweile ein MUSS geworden: Installieren der ZSH Shell unter cygwin.

AssertJ und java.util.List

AssertJ hat eine praktische Möglichkeit, Listen in JUnit Tests abzuprüfen. Insbesondere, wenn in der Liste komplexe Objekte abgelegt sind, s...