Aktorenmodell und Akka – Teil 2/3

Dies ist der zweite Teil einer dreiteiligen Artikelserie zum Aktorenmodell und Akka. Während im ersten Teil eine grundlegende Einführung in das Aktorenmodell gegeben wurde, behandelt dieser zweite Artikel die Einführung in Akka allgemein, sowie einige Codebeispiele mit Akka in Java. Im dritten Artikel wird es um das Clustering mit Akka gehen.

Akka

Akka ist ein Opensource Toolkit für die JVM und stellt eine Implementierung des Aktorenmodells dar. Es ist komplett in Scala geschrieben und ein Versuch, Features aus der Programmiersprache Erlang in die JVM-Umgebung zu bringen. Seit Scala 2.10 ist Akka auch in der Scala-Standardbibliothek enthalten. Außerdem ist Akka Teil der reaktiven Lightbend Plattform. Natürlich kann man mit Akka eine verteilte Anwendung nach dem Aktorenmodell mit begrenztem Aufwand auf mehrere Maschinen skalieren.

Ein interessanter Fakt ist, dass das kürzlich auf diesem Blog vorgestellte Apache Spark auf Akka basiert.

Akka in der Praxis

Wer verwendet Akka in der Praxis? Hier zwei Beispiele.

Intels GearPump Streaming-Engine basiert auf Akka. Die Entwickler beschreiben GearPump als eine von Akka inspirierte, sehr leichtgewichtige Big-Data-Streaming Engine. Inzwischen ist das Projekt bei Apache gehostest und hier zu finden.

UniCredit verwendet Akka zusammen mit Apache Spark, um die Streams sämtlicher entstehenden Unternehmensdaten direkt zu verarbeiten. In ihrem Fall ermöglichen die Akka-Aktoren eine bessere Verteilung, als mit einer Microservice-Architektur laut ihren Angaben möglich wäre. Mittelfristig soll jedoch vollständig auf Apache Spark umgestiegen werden.

Primzahlen mit Akka

Zunächst eine App, welche die Grundzüge von Akka veranschaulichen wird. Im ersten Schritt soll diese lokal auf unserer Maschine laufen. Die App repräsentiert ein verteiltes System zur Berechnung von Primzahlen.

Die grundlegende Architektur unserer Primzahlenanwendung mithilfe von Akka ist in dieser Abbildung (“Akka Primzahlenberechnung – Architektur”) erkennbar.

Akka Primzahlberechnung - Architektur

In der Hauptanwendung wird das Intervall festgelegt, auf dem die Primzahlen berechnet werden. Dieses Intervall wird als SegmentMessage verpackt und an den PrimeMaster geschickt. Die SegmentMessage ist eine serialisierbare Klasse, welche als Attribute die beiden 64-Bit-Integer Werte “start” und “stop” enthält. Der PrimeMaster teilt nun das Intervall in kleinere Subintervalle auf. Diese Subintervalle werden ebenfalls in Objekte vom Type SegmentMessage verpackt und an mehrere Instanzen vom Typ PrimeWorker verschickt. Die PrimeWorker berechnen nach dem Erhalt des ihnen zugeteilten Segments alle Primzahlen in diesem Segment und schicken die Antwort in Form einer Liste mit den Primzahlen als PrimeResult an den PrimeMaster zurück. Nachdem der PrimeMaster alle Resultate erhalten hat, sortiert er die gesamte Liste und persistiert sie in eine Datei.

Der Unterschied zur üblichen objektorientierten Programmierung ist, dass wir anstatt der Methodenaufrufe zwischen den Objekten (z.B. “PrimeWorker.getPrimes(long start, long end)”) eine Nachricht mit den Parametern an das entsprechende Objekt schicken. Das Ergebnis wird nicht als Rückgabewert einer Methode zurückgegeben, sondern ebenfalls als Nachricht versandt. Dadurch wird eine noch höhere Kapselung erreicht. Außerdem sind Methodenaufrufe normalerweise synchron, während das Messaging in Akka komplett asynchron abläuft.

Nun zur Implementierung. Zunächst beginnen wir mit der Einbindung des SDKs mit Maven:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.11</artifactId>
    <version>2.4.14</version>
</dependency>

Gradle:

compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.4.14'

Beim Quellcode beginnen wir mit der PrimeApp-Klasse, welche das Akka System lokal initialisiert und den Haupteinstiegspunkt für unsere Java Anwendung enthält. Da diese Klasse ein Aktor ist, erbt sie von der abstrakten Klasse “UntypedActor”. Da unsere Klasse keine Nachrichten empfangen soll, bleibt die onReceive(…)-Methode leer. Außerdem enthält die Klasse noch eine main-Methode mit folgendem Inhalt:

//create the actor system and startup the PrimeMaster actor
ActorSystem actorSystem = ActorSystem.create("system");
ActorRef primeMaster = actorSystem.actorOf(
   Props.create(PrimeMaster.class), "master");
		
//send the PrimeMaster actor the segment of numbers
primeMaster.tell(new SegmentMessage(0, 100), actorSystem.actorOf(
   Props.create(Props.create(PrimeApp.class),"app"));

Als erstes wird das Aktorensystem mit “ActorSystem.create(…)” initialisiert. Der übergebene String-Parameter identifiziert dabei das zu erstellende Aktorensystem eindeutig. Anschließend wird ein neuer Aktor von der Klasse PrimeMaster erstellt. Zuletzt wird mittels “primeMaster.tell(…)” das zu untersuchende Intervall in Form einer SegmentMessage mit dem Startwert 0 und dem Endwert 100 übermittelt.

Als nächstes erstellen wir die Klasse “PrimeMaster”, welche die verteilte Primzahlberechnung koordiniert und später das Ergebnis einsammelt.

public class PrimeMaster extends UntypedActor {
	private static final int WORKER_COUNT = 12;
	private final List<Long> primeResults = new ArrayList<>();
	private int resultCount = 0;

	/*…*/
}

Diese Klasse ist ein Aktor, folglich erbt auch sie von “UntypedActor”. Außerdem wird die Anzahl der Worker hier auf 12 festgelegt und es wird eine Liste, sowie eine Zählvariable zum späteren Einsammeln der Resultate initialisiert.

UntypedActor gibt dabei allerdings die Methode “onReceive(Object message)” vor. Diese Methode wird beim Eintreffen einer Nachricht synchron aufgerufen. Es wird also nicht vorkommen, dass mehrere Nachrichten gleichzeitig abgearbeitet werden und der Code in der Methode mehrmals simultan läuft. Außerdem kann eine Exception geworfen werden, welche dann an den Sender weitergeleitet wird.

Die Klasse PrimeMaster soll später zwei verschiedene Nachrichtentypen empfangen:

  1. Eine SegmentMessage von der PrimeApp mit dem Gesamtberechnungsintervall.
  2. Jeweils eine PrimeResult Nachricht von jedem PrimeWorker mit dem Resultat der Primzahlberechnung aus dem jeweiligen Subintervall.

Hier der erste Teil der Methode:

@Override
public void onReceive(Object msg) throws Exception {

  //message from PrimeApp received
  if (msg instanceof SegmentMessage) {

    // partition input domain
    SegmentMessage sm = (SegmentMessage) msg;
    int segLength = (int) ((sm.getEnd() - sm.getStart()) / WORKER_COUNT);

    for (int cnt = 0; cnt < WORKER_COUNT; cnt++) {
      long rightBound = sm.getEnd();

      if (cnt != WORKER_COUNT - 1) {
        rightBound = sm.getStart() + (cnt + 1) * segLength;
      }

      // send partition to a new worker
      getContext().actorOf(Props.create(PrimeWorker.class))
       .tell(new SegmentMessage(sm.getStart() + cnt * segLength,
         rightBound), getSelf());
    }
  }
}

Wenn die Nachricht also vom Typ SegmentMessage ist, wird die Inputdomain (welche als SegmentMessage empfangen wurde) in gleich große Intervalle partitioniert. Für jedes Intervall wird eine SegmentMessage gepackt. Anschließend wird ein PrimeWorker erstellt und die SegmentMessage wird an diesen weiterleitet.

Damit auch die Nachrichten vom Typ PrimeResult verarbeitet werden, muss dieser Codeblock noch nach dem “if” oben in die on-Receive-Methode der PrimeMaster-Klasse eingefügt werden:

// Message from PrimeWorker received
else if (msg instanceof PrimeResult) {
	primeResults.addAll(((PrimeResult) msg).getResults());

	// check, if all subsets were received
	if (++resultCount >= WORKER_COUNT) {
		Collections.sort(primeResults);

		final PrintWriter p = new PrintWriter("./output.txt");
		p.println("Size: " + primeResults.size());
		primeResults.forEach(prime -> p.print(prime + ", "));
		p.close();

		System.out.println("Done.");
		this.getContext().system().terminate();
	}
}

Wenn es sich bei der empfangenen Nachricht also um ein PrimeResult handelt, werden die berechneten Primzahlen in die bereits bestehende Liste hinzugefügt und der Ergebniszähler (“resultCounter”) erhöht. Wenn genauso viele Ergebnisse eingetroffen sind, wie PrimeWorker gestartet wurden, wird die Liste mit allen errechneten Primzahlen sortiert und zusammen mit der Listengröße in die Datei “output.txt” geschrieben. Anschließend wird das Aktoren System mit “this.getContext().system().terminate()” beendet. Da in unserem Fall die Kommunikation über TCP läuft, können keine Nachrichten auf dem Weg verloren gehen. Der Fall, dass einer der PrimeWorker während der Berechnung abstürzt und sein Ergebnis nicht absendet, wird hier ausgeklammert.

Schließlich noch die PrimeWorker-Klasse, welche ebenfalls einen Aktor repräsentiert:

public class PrimeWorker extends UntypedActor {
  @Override
  public void onReceive(Object msg) throws Exception {
    // receive partition from PrimeMaster
    if (msg instanceof SegmentMessage) {
      List<Long> primes = new ArrayList<>();
      for (long i = ((SegmentMessage) msg).getStart(); i < 
            ((SegmentMessage) msg).getEnd(); i++) {

        // add to result, if element is a prime number
        if (isPrime(i)) {
          primes.add(i);
        }
      }
      // send resulting subset of primes to PrimeMaster
      this.getSender().tell(new PrimeResult(primes), 
        this.getSelf());
    }
  }
  /** @return whether n is a prime number*/
  private boolean isPrime(long n) {
     /**…*/
  }
}

Sobald der PrimeWorker eine SegmentMessage erhält, beginnt er, alle Zahlen im Intervall darauf zu prüfen, ob sie eine Primzahl sind und fügt die Primzahlen der Liste “primes” hinzu. Die Primzahlprüfung selbst wurde in diesem Codebeispiel aus Platzgründen ausgelassen. Wenn alle Zahlen im Intervall geprüft sind, wird die Liste mit den Primzahlen als PrimeResult an den Sender (PrimeMaster über PrimeRouter) geschickt. Der Sender wird mit “this.getSender()” erhalten. Außerdem wird als Absender der PrimeResult-Nachricht noch die aktuelle Instanz des PrimeWorkers eingestellt. Hier kann nicht etwa “this” übergeben werden, sondern “this.getSelf()”, welche die assoziierte Aktorenreferenz vom Typ “ActorRef” enthält.

Als Resultat erhalten wir alle 25 Primzahlen zwischen 0 und 100 in unserer Ausgabedatei. Diese lauten:

2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97

Nein, 1 ist keine Primzahl.

Damit haben wir also unsere Primzahlberechnung auf 12 Worker verteilt. Momentan laufen diese alle auf der gleichen Maschine, und dieses Resultat hätten wir auch mit einfachen Threadpools ohne Akka realisieren können. Daher werden wir im nächsten Blogartikel zeigen, wie diese 12 Worker auf mehrere Maschinen verteilt werden können.

Short URL for this post: https://wp.me/p4nxik-2O4
This entry was posted in Java Runtimes - VM, Appserver & Cloud and tagged , , , , . Bookmark the permalink.

Leave a Reply