Publish-Subscribe mit der Flow-API in Java 9

Die Concurrency-APIs im JDK sind seit Java 1.5 ein wichtiger Bestandteil und haben immer wieder kleinere oder größere Aktualisierungen bekommen. So auch in Java 9 (JEP 266). In diesem Fall ist ein kleines Publish-Subscribe Framework (im Folgenden auch Flow-API genannt) hinzugekommen.

Dieser Artikel befasst sich damit, wie dieses Publish-Subscribe-Framework funktioniert und wie es zu verwenden ist.

Publish-Subscribe: Push vs. Back Pressure

Eine allgemeine Einführung in die Funktionsweise von Publish-Subscribe-Systemen ist dem Blogartikel zu Apache ActiveMQ auf diesem Blog zu entnehmen.

Die neue Flow-API basiert auf der Reactive-Streams Initiative, Publisher und Subscriber werden über Subscription-Objekte verwaltet. Ein Hauptpunkt, den Reactive Streams adressieren wollte, ist ein Problem im Zusammenhang mit Push-basierten Publish-Subscribe-Systemen. Das pushbasierte Public-Subscribe ist in der folgenden Abbildung visualisiert:

pushbasiertes Publish Subscribe

Pushbasiertes Publish-Subscribe

Bei einem pushbasierten Publish-Subscribe-System wird jede Nachricht, welche beim Publisher erzeugt wird, direkt an die Subscriber weitergegeben. Wenn also sehr schnell sehr viele Nachrichten erzeugt werden und/oder die Subscriber die Nachrichten langsamer verarbeiten, als diese von den Publishern erzeugt werden, müssen die Nachrichten gepuffert werden. Dieser Puffer kann unter Umständen beliebig groß werden, und es wird ein Puffer in jedem Subscriber benötigt.

Um diesem Problem vorzubeugen, wird von der Reactive Streams Initiative ein sogenanntes back-pressure-basiertes Publish-Subscribe-System vorgeschlagen, welches nun auch in Java 9 Verwendung findet. Dieses ist in dieser Abbildung visualisiert:

Publish Subscribe mit Back Pressure

Publish-Subscribe mit Back Pressure

Das Besondere hierbei ist, dass es einen Back-Channel gibt. Darüber steuert der Subscriber, wann er eine neue Nachricht empfängt. Dadurch ist das Puffern der Nachrichten im Subscriber selbst nicht mehr nötig. Sobald der Subscriber seine aktuelle Nachricht fertig verarbeitet hat, kann er das nächste anfordern. Somit müssen die Nachrichten nur einmal an einer zentralen Stelle gepuffert werden, anstatt in jedem Subscriber. Im optimalen Fall würde der Publisher selbst auch erst auf den Back-Pressure reagieren und eine neue Nachricht erst dann erzeugen, wenn ein Subscriber sie anfordert. Dies ist jedoch nicht in jedem Anwendungsfall möglich. Aber immerhin wird die Menge des benötigten Pufferspeichers im Vergleich zu klassischen pushbasierten Systemen drastisch reduziert.

Publish-Subscribe in Java 9

In Java 9 ist die Klasse java.util.concurrent.Flow hinzugekommen. Diese enthält mehrere für ein Publish-Subscribe-System relevante Interfaces. Dazu gehören ein Publisher, ein Subscriber, ein Processor (Publisher und Subscriber in einem) und eine Subscription. Des Weiteren gibt es die Klasse SubmissionPublisher, welche bereits eine Standardimplementierung zum Versand von Objekten enthält.

Nun ein kleines Beispielprogramm. Es sollen einfache Textnachrichten über ein Publish-Subscribe-System ausgeliefert werden. Dabei soll es einen Publisher und zwei Subscriber geben. Beginnen wir mit der main-Methode:

private static final int SUBSCRIBER_COUNT = 2;
private static CountDownLatch countdown;

public static void main(String[] args) throws InterruptedException {
  SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
  countdown = new CountDownLatch(SUBSCRIBER_COUNT);
		
  for (int i = 0; i < SUBSCRIBER_COUNT; i++) {
    TextSubscriber subscriber = new TextSubscriber();
    publisher.subscribe(subscriber);
  }
  Arrays.asList("I", "just", "came", "to", "say", "hello").
    forEach(str -> publisher.submit(str));
  publisher.close();

  countdown.await();
}

Nachdem zunächst ein SubmissionPublisher vom Typ String für den Versand der Textnachrichten erstellt wurde, wird der CountDownLatch mit der Anzahl der Subscriber initialisiert. Dieser CountDownLatch findet erst im letzten Schritt in Zeile 16 erneut Verwendung. Er soll sicherstellen, dass alle Subscriber alle Nachrichten verarbeiten konnten, bevor das Programm terminiert.

Nach der Initialisierung des countDownLatch werden zwei Objekte der eigenen Klasse TextSubscriber (siehe unten) erstellt, welche via der subscribe-Methode auf den Publisher subscriben (Zeile 8 ff).

Im nächsten Schritt (Zeile 12 ff) wird nun eine Liste von Nachrichten generiert, die über einen Aufruf der submit-Methode an den Publisher übergeben werden.

Nun zur Klasse TextSubscriber:

static class TextSubscriber implements Subscriber<String> {

  private Subscription subscription;

  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    System.out.printf("%d: On Subscribe %d\n", this.hashCode());
    subscription.request(1);
  }

  public void onNext(String message) {
    System.out.printf("%d: Received '%s'\n", this.hashCode(), message);
    this.subscription.request(1);
  }

  public void onError(Throwable throwable) {
    System.out.printf("%d: Error %s\n", this.hashCode(), throwable.getMessage());
  }

  public void onComplete() {
    System.out.printf("%d: Complete\n", this.hashCode());
    countdown.countDown();
  }
}

Die onSubscribe-Methode wird direkt aufgerufen, nachdem der Subscriber in Zeile 10 des oberen Code-Snippets auf den Publisher subscribt. Hierbei wird ein Subscription-Objekt erzeugt, welches dann an die onSubscribe-Methode des TextSubscriber weitergegeben wird. Dieses Subscription-Objekt wird benötigt, um neue Nachrichten vom Publisher anzufordern. Dies geschieht auch gleich in der letzten Zeile (Zeile 8) der onSubscribe-Methode.

Sobald eine neue Nachricht vom TextSubscriber empfangen wurde, wird diese an die onNext-Methode übergeben. Diese erwartet in unserem Fall ein Objekt vom Typ String, da unser TextSubscriber das Subscriber-Interface mit dem Typ String parametrisiert hat. In der onNext-Methode wird die Nachricht dann auf der Konsole ausgegeben (Zeile 12) und die nächste Nachricht angefordert (Zeile 13).

Wenn es während der Evaluation der onNext-Methode zu einer Exception kommt, wird die onError-Methode mit der Exception als Parameter aufgerufen.

Sobald alle Nachrichten verarbeitet wurden und es keine neuen mehr gibt (sprich, der Publisher wurde mit dem Aufruf der close-Methode beendet, siehe Zeile 14 des oberen Snippets), wird die onComplete-Methode aufgerufen. In unserem Fall dekrementiert diese den CountDownLatch. Nachdem beide TextSubscriber die Verarbeitung aller Nachrichten abgeschlossen haben, sollte der Countdown den Wert 0 erreichen. Damit blockiert der Aufruf countdown.await() in Zeile 16 des oberen Snippets nicht länger den Haupt-Thread. Das Programm terminiert.

Die Ausgabe des Programms kann mit jedem Aufruf variieren und sieht in etwa so aus:

1: On Subscribe
2: On Subscribe
2: Received 'I'
1: Received 'I'
2: Received 'just'
1: Received 'just'
2: Received 'came'
1: Received 'came'
2: Received 'to'
1: Received 'to'
1: Received 'say'
1: Received 'hello'
1: Complete
2: Received 'say'
2: Received 'hello'
2: Complete

Um die Lesbarkeit zu verbessern, wurden hier die Hashcodes der beiden TextSubscriber mit den Zahlen 1 und 2 ersetzt.

Wir sehen, dass zunächst die onSubscribe-Methoden beider TextSubscriber aufgerufen wurde. Danach wurden die Strings ‘I’, ‘just’, ‘came’ und ‘to’ verarbeitet. TextSubscriber 2 war dabei jedes Mal schneller. Anschließend hat TextSubscriber 1 die restlichen Wörter ‘say’ und ‘hello’ verarbeitet und anschließend die onComplete-Methode aufgerufen. TextSubscriber 2 macht kurz darauf das Gleiche.

Fazit

Mit Java 9 wurde nun also ein minimales Publish-Subscribe-Framework innerhalb des JDK eingeführt, an welchem sich die Entwickler eigener Pub-Sub-Frameworks nun orientieren können. Das Framework ist relativ trivial zu bedienen, lässt sich aber beliebig zu komplexeren Pub-Sub-Systemen erweitern. Positiv ist außerdem zu betrachten, dass die JDK-Entwickler gleich auf ein modernes back-pressure-basiertes System setzen, um unnötiges Puffern von vornherein zu verhindern.

Weiterführende Links

[JEP 266]
[Reactive Streams]
[Java World]
[Countdown Latch]

Short URL for this post: https://wp.me/p4nxik-321
This entry was posted in Java Basics and tagged , , , , . Bookmark the permalink.

Leave a Reply