Apache Spark (Teil 2/2)

Dies ist der zweite Artikel einer zweiteiligen Serie zu Apache Spark. Während der erste Artikel die allgemeine Funktionsweise von Apache Spark erklärt, legt dieser Artikel den Fokus auf die praktische Anwendung anhand einiger Code-Beispiele. Die Beispiele beziehen sich ausschließlich auf den Spark-Core und nicht auf Erweiterungen, wie MLib, GraphX oder Sonstige.

Setup

Bevor wir beginnen können, benötigen wir zunächst einmal noch Spark an sich. Das können wir von der offiziellen Website herunterladen und installieren. Eine detaillierte Anleitung ist hier zu finden.

Zusätzlich benötigen wir natürlich noch das SDK für den Spark-Core. Dieses lässt sich beispielsweise mit Maven einbinden:

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.1</version>
</dependency>

Alternativ via Gradle:

compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.1'

Natürlich kann man das Jar-File auch manuell zum Classpath hinzufügen. Es ist hier zu finden.

Es kann losgehen

Nun können wir anfangen. Im ersten Beispiel haben wir eine Textdatei (test.txt), welche einen englischen Artikel aus Wikipedia zu Apache Spark enthält. Wir möchten die Wörter im Artikel zählen lassen. Dies funktioniert wie folgt:

private long countWords(String filename) {
   // Create a configuration to interact with Spark
   SparkConf conf = new SparkConf().setMaster("local")
     .setAppName("WordCounter");

   // Create a Spark Context from the configuration
   JavaSparkContext sc = new JavaSparkContext(conf);

   // Load the Wikipedia article
   JavaRDD<String> input = sc.textFile(filename);

   // trim and split the input string into words
   JavaRDD<String> words = input.flatMap(s -> Arrays.asList(
      s.trim().split(" ")).iterator());

   // count the entries in the RDD containing the words
   long cnt = words.count();

   // close the spark context
   sc.close();

   return cnt;
}

Wir haben also unsere Methode countWords, welche die Anzahl der Wörter als long zurückgibt. Als einzigen Parameter erhält sie den Dateinamen und Pfad filename.

Zunächst wird ein Objekt vom Typ SparkConf erstellt (Zeile 3). Dieses Objekt repräsentiert die Konfiguration für die Sparkanwendung. Da wir unsere Sparkanwendung auf einem lokalen Thread laufen lassen möchten, übergeben wir für setMaster den String “local”. Alternativ könnten wir auch den String “local[n]” übergeben, wobei n als Integer für die Anzahl der Threads steht. Außerdem kann eine URL, wie beispielsweise “spark://master:7077” übergeben werde. Damit wird Spark auf einem standalone cluster ausführt. Schließlich setzen wir den Namen der Anwendung mittels setAppName auf “WordCounter” (Zeile 4).

Im nächsten Schritt erstellen wir aus dem SparkConf-Objekt ein JavaSparkContext-Objekt (Zeile 7). Dieser JavaSparkContext ist die Version vom org.apache.spark.SparkContext-Objekt speziell für Java. Es gibt Objekte vom Typ JavaRDD zurück und funktioniert mit Java Collections. Diese JavaRDDs sind eine zu Java kompatible Form der Resilient Distributed Datasets (RDD) in Spark. Da es immer nur einen JavaSparkContext pro JVM geben kann, sollte der JavaSparkContext nach Abschluss der Operationen mit der stop()-Methode oder der close()-Methode beendet werden (Zeile 20).

Nun lesen wir die Datei aus filename in ein JavaRDD ein (Zeile 10). Das JavaRDD ist die für Java angepasste Version des Apache Spark RDDs. Das Einlesen funktioniert mit einem einfachen Methodenaufruf der Methode textFile(filename) auf dem JavaSparkContext-Objekt sc. Im resultierenden JavaRDD input ist nun für jede Textzeile in der Inputdatei ein String-Objekt enthalten, welches die jeweilige Zeile des Wikipedia-Artikels enthält.

Im 4. Schritt teilen wir die einzelnen Zeilen in die einzelnen Wörter auf und speichern diese in der neuen JavaRDD words (Zeile 13 und 14). Dazu lassen wir die Daten in ein neues RDD in Form einer Flatmap transformieren, wofür wir wiederum input.flatMap(…) aufrufen. Diese Methode erstellt das RDD für uns und übergibt dabei jeden einzelnen Werte aus dem Input-RDD (das mit den Zeilen) an eine FlatMapFunction. Der Rückgabewert dieser Funktion muss vom Typ Iterator sein. Diese FlatMapFunction haben wir hier durch einen entsprechenden Lambda-Ausdruck (Zeile 13 und 14) ersetzt. s beschreibt dabei den Inputstring für die Funktion. Wir rufen also die trim()-Operation auf den String aus und splitten ihn danach bei den Leerzeichen. Aus dem resultierenden Stringarray erstellen wir eine Liste, welche einen Iterator besitzt. Dieser Iterator wird zurückgegeben und die enthaltenen Werte werden in das neue RDD namens words vom Typ String geschrieben. RDDs sind immutable, daher können wir das alte input-RDD nicht verändern.

Im letzten Schritt zählen wir die Anzahl der Einträge im RDD words mittels der count()-Methode (Zeile 17). Erst an dieser Stelle wird der gesamte Verarbeitungsprozess gestartet. Eingelesen wurde die Datei schon. Nun wird also jede Textzeile bei den Leerzeichen gesplittet und in die Map eingefügt. Mehr zum detaillierten Ablauf in Beispiel 3. Schließlich wird die Map in ein JavaRDD verpackt. Die Anzahl der Einträge in diesem RDD werden gezählt.

Zum Schluss schließen wir den SparkContext (Zeile 20) und geben die Anzahl zurück (Zeile 22). Das Resultat ist heute (25.10.2016) 1650 Wörter.

Im nächsten Beispiel möchten wir die Zeichen im gleichen Artikel zählen lassen. Hier möchten wir allerdings nicht die Gesamtzahl der Zeichen wissen. Die Anzahl der Zeichen soll stattdessen aufgeschlüsselt nach Zeichen sein. Anschließend sollen die 10 Zeichen ausgeben werden, welche am häufigsten verwendet wurden. Zusätzlich soll eine Liste mit allen Zeichen-Anzahl-Tupeln gespeichert werden. Hier der dazugehörige Quellcode:

private void charCount(String filename) {
   // Create a configuration to interact with Spark
   SparkConf conf = new SparkConf().setMaster("local")
      .setAppName("Charcounter");

   // Create a Spark Context from the configuration
   JavaSparkContext sc = new JavaSparkContext(conf);

   // Load the Wikipedia article
   JavaRDD input = sc.textFile(filename);

   // split the input string into chars
   JavaRDD chars = input.flatMap(s -> Arrays.asList(
      s.split("")).iterator());

   // transform the collection of chars into pairs
   // (char, 1) and then count them
   JavaPairRDD counts = chars.mapToPair(t -> new 
      Tuple2(t, 1)).reduceByKey(
         (x, y) -> (int) x + (int) y);

   counts = counts.mapToPair(item -> item.swap()).sortByKey(
      false, 1).mapToPair(item -> item.swap());

   //Take the top 10 elements from RDD
   List<Tuple2> list = counts.take(10);
		
   //Output the top 10 elements
   for (Tuple2 t : list) {
	System.out.printf("'%s' - %d\n", t._1(), t._2());
   }

   // Save the char counts back out to a file.
   counts.saveAsTextFile("output");

   // close the spark context
   sc.close();
}

Die ersten 14 Zeilen sind weitestgehend identisch zu denen aus dem oberen Beispiel. Der JavaSparkContext wird erstellt, die Inputdatei wird eingelesen und gesplittet. Hier allerdings nicht bei den Leerzeichen, sondern mit einem Leerstring. Das führt dazu, dass der Dateiinhalt zeilenweise bei jedem Zeichen gesplittet wird. Das Resultat wird wieder in eine Liste konvertiert und dieses Mal in das RDD chars gespeichert (Zeile 13 und 14).

Nun transformieren wir unser String-RDD chars in ein JavaPairRDD. Das JavaPairRDD repräsentiert, wie der Name schon sagt, ein RDD, welches Zweiertupel enthält. In diesem Fall einen String als Schlüssel und einen Integer als Wert. Das PairRDD wird erstellt, indem wir auf unser chars-RDD die Methode mapToPair() aufrufen und dann für jeden enthaltenen char ein neues Tupel erzeugen. Dieses neue Tupel hat jeweils den enthaltenen char (konvertiert als String) als Schlüssel und 1 als Wert (Zeile 18 und 19). Warum wir für den Wert 1 wählen, wird später ersichtlich. Anschließen wird die Methode reduceByKey(…) auf das resultierende RDD aufgerufen. Diese Methode gruppiert alle enthaltenen Tupel nach ihren Keys und führt die übergeben Funktion auf alle Dubletten aus. Diese ihr übergebene Funktion erhält als Parameter die beiden Werte von den beiden Tupeln mit gleichem Key. Da unsere Values vom Typ Integer sind, erwartet sie als Rückgabetyp in unserem Fall ebenfalls einen Integer. Auch diese Funktion ersetzen wir durch einen Lambda-Ausdruck. Innerhalb dieser Funktion werden die beiden übergebenen Values x und y aufsummiert (Zeile 20). Damit haben wir auch den Grund, warum wir im vorherigen Schritt jeweils eine 1 als Value in die Tupel geschrieben haben. Das resultierende RDD wird in der Variablen counts gespeichert.

Für unsere Top-10-Liste müssen wir das RDD jetzt noch nach Values sortieren. Da wir nur nach Schlüssel sortieren können, vertauschen wir nun Schlüssel und Wert mit Hilfe der swap()-Methode auf dem jeweiligen Tupel (Zeile 22). Danach sortieren wir das entstandene RDD nach Schlüssel via der sortByKey()-Methode. Als Parameter übergeben wir, ob auf- oder absteigend sortiert werden soll (false -> absteigend), und wie viele Partitionen es geben soll (eine). Schließlich vertauschen wir Schlüssel und Wert erneut und schreiben das Ergebnis zurück in die counts-Variable (Zeile 22 und 23). Danach nehmen wir uns die ersten 10 Tupel aus dem RDD (Zeile 26). Da wir das RDD soeben sortiert haben, wissen wir, dass es sich dabei um die 10 Tupel mit dem größten Wert handeln muss. Der Rückgabewert der dafür verwendeten take()-Methode ist in unserem Fall ein Objekt vom Typ List<Tuple2>. Diese Tupelliste iterieren wir nun und können mit t._1() und t._2() jeweils den Key und den Value für jedes Tupel erhalten und ausgeben (Zeile 29 – 31).

Schließlich speichern wir unsere Tupel in unserem RDD noch (Zeile 34) und schließen den SparkContext (Zeile 37). In diesem Beispiel speichern wir das Ergebnis der Einfachheit halber nun in eine einfache Textdatei auf unser Dateisystem. Natürlich könnten wir es auch in eine Textdatei auf einem HDFS oder als Hadoop-File ablegen.

Als Resultat erhalten wir in unserem Dateisystem einen Ordner namens “output”, in dem sich eine _SUCCESS-Datei und eine part-000000-Datei befindet. In der part-Datei sind nun unsere Tupel sortiert nach dem Value zu finden. Hier einen Auszug:

( ,1638)
(e,837)
(a,807)
(t,654)
(r,546)
(i,543)
(o,513)
(n,499)

Wir stellen fest, dass das Leerzeichen mit 1638 Verwendungen am häufigsten verwendet wurde. Darauf folgen e mit 837 und a mit 807 Verwendungen.

Im letzten Beispiel analysieren wir ein beispielhaftes Apache Access-Log. Dabei wird insbesondere erklärt, wann welche Operationen ausgeführt werden, da dies nicht ganz so trivial ist, wie es zunächst scheint. Zum Parsing verwenden wir diesen Parser von Databricks mit einigen Anpassungen für Hostnames und spezielle Statuscodes (z.B. 304 und 408). Wir möchten aus dem Log alle http-Responsecodes zählen, gruppieren, ausgeben und anschließend speichern. Hier der Quellcode:

private void httpResponseCodeCounter(String filename) {
   // Create a configuration to interact with Spark
   SparkConf conf = new SparkConf().setMaster("local").
      setAppName("HTTPCodeCounter");

   // Create a Spark Context from the configuration
   JavaSparkContext sc = new JavaSparkContext(conf);

   // Load the input file
    JavaRDD inputLines = sc.textFile(filename);

   // parse each line to a log-object &amp; remove null-values
   JavaRDD log = inputLines.map(s ->
      ApacheAccessLog.parseFromLogLine(s)).filter(t -> t != null);

   // transform the collection of log objects to tuples with 
   // (HTTP-Code, 1) and count them
   JavaPairRDD counts = log.mapToPair(t -> 
      new Tuple2(t.getResponseCode(), 1))
         .reduceByKey((x, y) -> x + y);

   counts = counts.mapToPair(item -> item.swap()).sortByKey(
      false, 1).mapToPair(item -> item.swap());

   // print all
   counts.foreach(t -> System.out.printf("(%s,%d)\n", t._1(), t._2()));

   // Save the char counts back out to a text file, causing evaluation.
   counts.saveAsTextFile("output");

   // close the spark context
   sc.close();
}

Wir erstellen wieder unseren SparkContext (Zeilen 3-7) und lesen das Logfile ein (Zeile 10). Anschließend wird jede Zeile im Log in ein Objekt vom Typ ApacheAccessLog konvertiert (Zeilen 13 und 14). Wenn es zu einem Konvertierungsfehler kommt, ist das Objekt null und wird herausgefiltert (Zeile 14). Wenn nicht, wird das Objekt an den nächsten Transformationsschritt weitergegeben, in welchem die Objekte mit gleichem Responsecode gruppiert und gezählt werden (Zeile 18-20).

Wenn man anstelle der Lambdas Methoden mit Ausgabestatements verwendet, fällt auf, dass nicht für das gesamte Input-RDD auf einmal eine Transaktion nach der anderen ausgeführt wird. Hier die Ausgabe:

(…)
Parsing line to log object for 414157042
transform to pair (x,1) for 414157042
Parsing line to log object for 41861785
transform to pair (x,1) for 41861785
Parsing line to log object for 344769431
transform to pair (x,1) for 344769431
(…)
Swapping 404 5
Swapping 401 123
Swapping 408 1
Swapping 200 1274
Swapping 302 6
Swapping 304 137
(…)
Sorting…
(…)
Swapping 1274 200
Print: (200,1274)
Swapping 137 304
Print: (304,137)
Swapping 123 401
Print: (401,123)
Swapping 6 302
Print: (302,6)
Swapping 5 404
Print: (404,5)
Swapping 1 408
Print: (408,1)
(…)
Saving…
(…)
Done.

Vielmehr wird der erste Eintrag aus dem RDD genommen, in ein ApacheAccessLog-Objekt konvertiert, auf null geprüft, mit dem http-Responsecode gemapt und gruppiert (1. Block). Danach werden Key und Value vertauscht (2. Block). Anschließend werden die Tupel sortiert. Danach wird jedes Tupel zurück vertauscht und danach ausgegeben (Block 3). Schließlich werden die Tupel gespeichert und der Prozess ist abgeschlossen. Somit hält sich der Speicherverbrauch in Grenzen, da es nicht tausende von Log-Objekten gleichzeitig gibt. Außerdem lässt sich der gesamte Vorgang so auch relativ einfach durch Aufteilung des Input-RDD parallelisieren.

Die Ausgabe, sowie der Inhalt der Textdatei im Output-Ordner sieht wie folgt aus:

(200,1274)
(304,137)
(401,123)
(302,6)
(404,5)
(408,1)

Fazit

Wir haben viel darüber gelernt, wie man in Spark große Datenmengen transformieren kann. Außerdem wissen wir nun, wie Spark die Daten verarbeitet. Anstatt dass einfach Zeile für Zeile im Programmcode abgearbeitet wird, wird jedes Objekt in einer RDD intelligent durch so viele Transformationsebenen gestreamt, bis es an einer Ebene angelangt, für die die Resultate der Transformationen aller Objekte im RDD benötigt werden (beispielsweise eine Aggregatfunktion, wie count() im ersten Beispiel). Durch dieses Verhalten wird der Speicherverbrauch reduziert und die Parallelisierung stark vereinfacht.

Short URL for this post: http://wp.me/p4nxik-2KK
This entry was posted in Open Source BI and tagged , , , , . Bookmark the permalink.

Leave a Reply