Aktorenmodell und Akka – Teil 3/3

Nachdem es im ersten Artikel dieser dreiteiligen Serie eine Einführung in das Aktorenmodell gab und der zweite Artikel eine genauere Einführung in Akka beinhaltet hat, geht es in diesem dritten Artikel um die Verwendung von Akka in Rechnerclustern.

Akka im Cluster

Hierfür werden die folgenden Dependencies für die Akka-Aktoren und für die Clusterfunktionalität benötigt:

<dependency>
	<groupId>com.typesafe.akka</groupId>
	<artifactId>akka-cluster_2.11</artifactId>
	<version>2.5.3</version>
</dependency>

<dependency>
	<groupId>com.typesafe.akka</groupId>
	<artifactId>akka-contrib_2.11</artifactId>
	<version>2.5.3</version>
	<exclusions>
		<exclusion>
			<groupId>com.typesafe.akka</groupId>
			<artifactId>akka-persistence-experimental_2.11
			  </artifactId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>com.typesafe.akka</groupId>
	<artifactId>akka-actor_2.11</artifactId>
	<version>2.5.3</version>
</dependency>

Nun soll die Primzahlberechnung aus dem letzten Beispiel auf ein Cluster mit mehreren Nodes auf unterschiedlichen JVMs verteilt werden. Einer der Nodes soll einen PrimeMaster erhalten, während die anderen Nodes jeweils zwei PrimeWorker erhalten. Wenn es einen PrimeMaster im Cluster gibt, können beliebig viele PrimeWorker hinzugefügt werden und sofort an der Primzahlberechnung teilnehmen. Die Aufgabe des PrimeMaster ist dabei, die verteilte Primzahlberechnung zu managen, in regelmäßigen Abständen neue Zahlensegmente (Zahlenintervalle) zu generieren und den PrimeWorkern bereitzustellen. Sobald der PrimeWorker bereit ist, kann er beim PrimeMaster ein neues Zahlenintervall aus der Liste der generierten Zahlensegmente anfordern. Nachdem er dieses erhalten hat, berechnet er alle Primzahlen auf dem Intervall und sendet das Ergebnis an den PrimeMaster zurück. Sofern noch weitere Segmente zur Verfügung stehen, erhält er sofort ein neues Segment. Ansonsten ist der PrimeWorker inaktiv, bis der PrimeMaster neue Segmente generiert hat und die PrimeWorker benachrichtigt.

Der Code ist teilweise angelehnt an diesen Blogpost [1] und diesen Code [2].

Konfiguration

Bevor wir mit der Erstellung dieser Klassen anfangen, hier zunächst die application.conf-Datei:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "hostname_node"
      port = 1337
    }
  }
  cluster {
    seed-nodes = [
      "akka.tcp://AkkaExampleSystem@hostname_seed_node:1337"
    ]
  }
  scheduler {
    tick-duration = 33ms
  }
  extensions = [
    "akka.cluster.pubsub.DistributedPubSub"
  ]
  log-dead-letters = 0
}

Es ist darauf zu achten, dass als Actor-Provider in Zeile 3 der ClusterActorRefProvider eingestellt ist. Dieser lädt die Cluster Extensions, wodurch das Cluster automatisch gestartet wird, sobald der ClusterActorRefProvider verwendet wird. Des Weiteren sollte als remote-hostname ein korrekter Hostname oder eine IP eingestellt werden. Als Port wird 1337 angegeben. Wichtig ist außerdem die korrekte Konfiguration der Seed Nodes (Zeile 14). In diesem Beispiel gibt es nur einen Seed Node, der ebenfalls unter Port 1337 erreichbar ist. Der Seed Node fungiert als Entrypoint für neue Nodes, die dem Cluster beitreten wollen. Wenn keiner der Seed-Nodes erreichbar ist, kann niemand dem Cluster ohne weiteres beitreten.

Für das Messaging über einen MessageBus wird außerdem die Extension DistributedPubSub (Zeile 21) verwendet.

Die Hauptklasse PrimeClusterApp

Nachdem nun die Anwendung grundlegend konfiguriert ist, folgt ein Auszug aus der Main-Methode der PrimeClusterApp-Klasse, welche den Haupteinstiegspunkt der Anwendung repräsentiert.

// Create an Akka system
Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" +
  port).withFallback(ConfigFactory.load());
actorSystem = ActorSystem.create("AkkaExampleSystem", config);
		
if(createMaster){ 
  //Create Master
  actorSystem.actorOf(ClusterSingletonManager.props(Props.
    create(PrimeClusterMasterActor.class), PoisonPill.getInstance(),
    ClusterSingletonManagerSettings.create(actorSystem)), "master");
}

//Create RandomPool of 2 Workers
actorSystem.actorOf(new RandomPool(2).props(Props.create(
  PrimeClusterWorkerActor.class)), "W_" + UUID.randomUUID());

Zunächst wird der in der application.conf-Datei eingestellte Port (1337) in Zeile 2 überschrieben. Dies ist erforderlich, sobald mehrere Nodes auf der gleichen Maschine gestartet werden sollen. Anschließend wird in Zeile 4 das Aktorensystem gestartet und verbindet sich automatisch über den Seed Node aus der application.conf mit dem Cluster.

Sofern ein Master-Aktor erstellt werden soll, wird in Zeile 8 eine Instanz des PrimeClusterMasterActors mit dem Namen „master“ erstellt. Damit nicht versehentlich mehrere Master-Instanzen erstellt werden, wird der ClusterSingletonManager verwendet.

Anschließend wird in Zeile 14 ein Worker-Pool mit 2 Aktoren vom Typ PrimeClusterWorkerActor in das Aktorensystem eingefügt. Diese Worker erhalten als Namen eine zufällige UUID mit dem Präfix „W_“.

Der clusterfähige PrimeMaster-Aktor

Weiter geht es mit der PrimeClusterMasterActor-Klasse. Diese Klasse soll die Zahlensegmente zur Primzahlberechnung generieren, an die Worker verteilen und die Ergebnisse wieder aggregieren. Hier werden zunächst einige Variablen deklariert:

private final ActorRef mediator =
  DistributedPubSub.get(getContext().system()).mediator();

private List<SegmentMessage> availableSegments = new ArrayList<>();

private final List<Long> primeResults = new ArrayList<>();
private int resultCount = 0, lastIntervalStart = 0;
private static final int INTERVAL_SIZE = 5000000; 
private static final int SEGMENT_COUNT = 500;

Zunächst wird in Zeile 1 ein Mediator-Aktor erstellt. Dieser referenziert den in der application.conf als Extension eingebundenen MessageBus. Anschließend wird eine Liste availableSegments mit den aktuell verfügbaren vorbereiteten Segment-Nachrichten erstellt. Eine Segmentnachricht enthält einen Startwert und einen Endwert und wird vom PrimeMaster an die PrimeWorker geschickt. Der PrimeWorker berechnet daraufhin alle Primzahlen im Intervall zwischen dem Startwert und dem Endwert.

Dazu gibt es noch eine Liste primeResults mit den bereits berechneten Primzahlen.

Schließlich gibt es einen Zähler für die bereits eingetroffenen Resultate (resultCount) und eine Variable zum Sichern des letzten Intervallstarts. Als Konstanten gibt es die Intervallbreite und die Segmentanzahl, die pro Tick generiert werden soll.

Nun zur createReceive()-Methode, welche die PrimeClusterMasterActor-Klasse von der Oberklasse AbstractActor erbt. Die Methode sieht wie folgt aus:

@Override
public Receive createReceive() {
  return receiveBuilder().matchEquals(MSG_PRIMES_GENERATED, msg -> {
      log.info("[Master] Scheduled wake-up!");
      mediator.tell(new DistributedPubSubMediator.Publish(
        TOPIC_WORKERS, MSG_WORK_AVAILABLE), self());
      schedulePrimeCalculation();
    }).matchEquals(MSG_GIVE_WORK, msg -> {
      // give worker a segment if available
      if (!availableSegments.isEmpty()) {
        sender().tell(availableSegments.remove(0), self());
      }
    }).match(PrimeResult.class, msg -> {
      handlePrimeResultReceived(msg);
      // give worker another segment if available
      if (!availableSegments.isEmpty()) {
        sender().tell(availableSegments.remove(0), self());
      }
      getContext().unwatch(sender());
    }).match(Terminated.class, msg -> 
      log.info("Active worker crashed: " + msg.getActor())
    ).build();
}

Zurückgegeben wird ein Objekt vom Typ Receive, welches mit Hilfe des Receive-Builders zusammengebaut wurde. Dabei gibt es drei grundlegende Fälle.

  1. Die empfangene Nachricht ist eine MSG_PRIMES_GENERATED String-Nachricht. Diese Nachricht wird von der Methode schedulePrimeCaluclation() (weiter unten) erzeugt, sobald neue Zahlensegmente generiert wurden. Beim Empfang dieser Nachricht in Zeile 5 wird über den auf Klassenebene erstellten MessageBus die Nachricht MSG_WORK_AVAILABLE versendet. Darauf antworten die PrimeWorker mit einer MSG_GIVE_WORK, welche dann in Zeile 8 verarbeitet wird. Nach Versand der MSG_WORK_AVAILABLE-Nachricht wird noch die Methode schedulePrimeCalculation() aufgerufen, die später in diesem Artikel beschrieben ist.
  2. Die empfangene Nachricht ist eine MSG_GIVE_WORK String-Nachricht (Zeile 8). Diese wird vom PrimeWorker zurück an den PrimeMaster geschickt, sobald der PrimeWorker bereit ist, neue Primzahlen zu berechnen. Sofern noch Zahlensegmente verfügbar sind (Zeile 10), wird das erste dieser Segmente an den anfragenden PrimeWorker zurückgeschickt.
  3. Die empfangene Nachricht enthält ein Berechnungsergebnis in Form der Klasse PrimeResult (Zeile 13) von einem PrimeWorker. Dieses Ergebnis wird zur Weiterverarbeitung an die Methode handlePrimeResultReceived(…) gegeben. Sofern noch ein Zahlensegment übrig ist, wird dieses direkt an den PrimeWorker zur Berechnung zurückgeschickt.
  4. Einer der Worker ist abgestürzt oder einer der Clusterknoten ist abgestürzt und es wurde eine Nachricht der Klasse Terminated empfangen (Zeile 20).
private List<SegmentMessage> createSegments() {
  //…
  return segments;
}

Die Methode createSegments() erstellt basierend auf dem Wert der Variable lastIntervalStart bei jedem Aufruf 500 Segmente mit einer Intervallbreite von 5.000.000. Das heißt, jedes der Segmente hat eine Segmentbreite von 10.000. Eine Liste mit den 500 generierten Segmenten wird zurückgegeben. Die genaue Implementierung ist relativ trivial und wird aus Platzgründen nicht aufgeführt.

Die Methode schedulePrimeCalculation() wird einmal am Anfang und dann alle 2 Sekunden aufgerufen. Sie sieht wie folgt aus:

private void schedulePrimeCalculation() {
  if (availableSegments.size() > 0) {
    System.err.print("Remaining segments: " + availableSegments.size()
      + " - No new segments were generated!\n");
  } else {
    availableSegments.addAll(createSegments());
  }
  context().system().scheduler().scheduleOnce(Duration.create(2000,
    TimeUnit.MILLISECONDS), self(), MSG_PRIMES_GENERATED,
    context().dispatcher(), null);
}

Zunächst wird geprüft, ob noch Segmente in der availableSegment-Liste verfügbar sind (Zeile 2). Sofern die Liste abgearbeitet ist, wird die createSegments()-Methode aufgerufen und die 500 neuen Segment-Nachrichten in die Liste eingefügt (Zeile 6). Anschließend terminiert die Methode den Versand der Nachricht MSG_PRIMES_GENERATED in 2 Sekunden, was einen erneuten Aufruf der schedulePrimeCalculation()-Methode zur Folge haben wird.

Als letzte Methode im PrimeMaster fehlt noch die handlePrimeResultReceived(…)-Methode, welche die empfangenen PrimeResults verarbeitet. Auch diese ist aus Platzgründen hier nicht weiter ausgeführt. Die handlePrimeResultReceived(…)-Methode fügt die Ergebnisse der primeResults-Liste hinzu. Anschließend wird geprüft, ob bereits alle Nachrichten aus dem aktuellen Segment eingetroffen sind. Sofern dies der Fall ist, wird die Primzahlliste sortiert und die momentan höchste Primzahl, sowie die aktuelle Anzahl der Primzahlen ausgegeben.

Der clusterfähige PrimeWorker-Aktor

Die PrimeWorker-Klasse beginnt wie folgt:

private final ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();

@Override
public void preStart() {
  mediator.tell(new DistributedPubSubMediator.Subscribe(TOPIC_WORKERS, self()), self());
}

@Override
public void postStop() {
  mediator.tell(new DistributedPubSubMediator.Unsubscribe(TOPIC_WORKERS, self()), self());
}

Zunächst wird auch hier eine Referenz auf das MessageBus-System geholt (Zeile 1). Außerdem gibt es die beiden Methoden preStart() und postStop(), welche den PrimeWorker-Aktor beim Start auf das Worker-Topic registrieren und beim Beenden wieder deregistrieren.

Nun zur createReceive()-Methode.

@Override
public Receive createReceive() {
  return receiveBuilder().matchEquals(MSG_WORK_AVAILABLE, msg -> {
    sender().tell(MSG_GIVE_WORK, self());
  }).match(SegmentMessage.class, msg -> {
    // receive partition from PrimeMaster
    List<Long> primes = new ArrayList<>();
    for (long i = ((SegmentMessage) msg).getStart(); i < ((SegmentMessage)
      msg).getEnd(); i++) {
      // add to result, if element is a prime number
      if (isPrime(i)) {
        primes.add(i);
      }
    }
    // send resulting subset of primes to PrimeMaster
    // primes.forEach(p -> System.out.println(p));
    this.getSender().tell(new PrimeResult(primes), this.getSelf());
  }).match(DistributedPubSubMediator.SubscribeAck.class, msg ->
    log.info("Subscribed to 'workers'!")
  ).build();
}

Wie im Code zu sehen ist, kann diese Methode drei Nachrichtentypen empfangen.

  1. MSG_WORK_AVAILABLE (Zeile 3). Diese Nachricht wird vom PrimeMaster über den MessageBus versendet, sobald neue Zahlensegmente verfügbar sind. Der Worker antwortet mit einer MSG_GIVE_WORK-Nachricht.
  2. SegmentMessage (Zeile 5). Diese Nachricht wurde vom PrimeMaster als Antwort auf die MSG_GIVE_WORK erhalten. Für alle Zahlen im empfangenen Intervall wird geprüft, ob sie Primzahlen sind. Die Methode isPrime(…) wird hier aus Platzgründen nicht weiter ausgeführt. Das Ergebnis der Primzahlprüfung des Intervalls wird in ein PrimeResult verpackt und zurück an den PrimeMaster geschickt.
  3. SubscribeAck (Zeile 18). Diese Nachricht wird als Antwort auf die Registrierung in der preStart()-Methode weiter oben empfangen und bestätigt die Registrierung auf das Topic.

Nun haben wir also ein beliebig erweiterbares System, das verteilt Primzahlen berechnen kann. Knoten zur Berechnung der Primzahlen können dynamisch hinzugefügt und entfernt werden. Nur der Knoten, welcher den PrimeMaster-Aktor enthält, kann nicht migriert werden. Die Implementierung der Migration geht allerdings über den Rahmen dieses Artikels hinaus.

Der Ansatz des dezentralen entkoppelten Systems, dessen Aktoren ausschließlich über Nachrichten kommunizieren, scheint zumindest in unseren lokalen Tests zu Performanceengpässen beim Nachrichtenversand zu führen. Dies ist insbesondere dann relevant, wenn viele Daten zwischen den Aktoren hin- und hergeschickt werden sollen. Während die Aktoren innerhalb einer JVM noch sehr schnell und performant kommunizieren konnten, gab es deutliche Engpässe bei der Verwendung mehrerer JVMs und der Kommunikation im Cluster, sogar bereits auf nur einer physischen Maschine.

Des Weiteren ist aufgefallen, dass die Standardgröße der Nachrichten für den Nachrichtenversand bei 128kB liegt. Dieser Fakt hat zu einigen akka.remote.OversizedPayloadExceptions geführt, lässt sich aber mit einer Anpassung der application.conf-Datei unter akka.remote beheben [4].

Bei der Erstellung der Blogartikelserie ist außerdem aufgefallen, dass insbesondere die Verwendung von Akka im Cluster, sowie die zugehörige Konfiguration und Anpassung der Anwendung nicht zu unterschätzen ist. Um ein grundlegendes Verständnis des Aufbaus von Netzwerken und Rechenclustern mit Akka zu erhalten, sollte ausreichend Zeit und Geduld eingeplant werden. Die Dokumentation [3] ist zwar umfangreich, jedoch häufig nicht leicht verständlich. Des Weiteren gibt es mit fast jedem größerem Update Breaking Changes an der API, wodurch ältere Tutorials und bestehender Beispielcode nur begrenzt ist [5].

Quellen

[1] Derek Wyatt. 2009. Balancing Workload Across Nodes with Akka 2. Balancing Workload Across Nodes with Akka 2. Retrieved from http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2
[2] eivindw. 2014. Akka Cluster Example Github. Akka Cluster Example Github. Retrieved from https://github.com/eivindw/akka-cluster-example
[3] Lightbend. 2017. Akka Documentation – Networking. Akka Documentation – Networking. Retrieved from http://doc.akka.io/docs/akka/2.5.2/java/index-network.html
[4] Saeed Zarinfam. 2016. Akka OversizedPayloadException. Akka OversizedPayloadException. Retrieved from https://stackoverflow.com/questions/36685326/max-allowed-size-128000-bytes-actual-size-of-encoded-class-scala-error-in-akk
[5] Terse Systems. 2015. Akka Clustering, Step by Step (Scala). Akka Clustering, Step by Step (Scala). Retrieved from https://tersesystems.com/2014/06/25/akka-clustering/

Short URL for this post: http://wp.me/p4nxik-2Qz
This entry was posted in Java Runtimes - VM, Appserver & Cloud and tagged , , , , , . Bookmark the permalink.

Leave a Reply