Was ist der Unterschied zwischen RDD, DataFrame und Dataset in Apache Spark?

Apache Spark wurde entwickelt, um eine höhere Leistung im Hadoop-Ökosystem erreichen zu können. Im Vergleich zu Hadoop soll Spark dank In-Memory-Berechnungen und weiterer Optimierungen bei der Datenverarbeitung wesentlich schneller sein.

In diesem Blog will ich erklären, welche Datenstrukturen Spark mitbringt und wie sich diese voneinander unterscheiden. In Apache Spark verwendet man drei Arten von Datenstrukturen: RDD, DataFrames und Dataset.

Resilient Distributed Datasets

Die erste Version von Spark startete mit der RDD Datenstruktur. RDD steht für Resilient Distributed Dataset. RDD ist eine fehlertolerante und partitionierte Collection von Daten, deren Elemente parallel verarbeitet werden können.  So können verlorene oder kaputte Partitionen auf beschädigten Spark-Knoten wiederhergestellt werden. In den meisten Fällen können RDDs durch das Laden von Daten aus verteilten Datenspeichern (z. B. HDFS, Cassandra, HBase usw.) oder durch das Auslesen von Dateien, die auf dem lokalen System gespeichert sind erstellt werden. Außerdem ist es möglich, RDDs aus einer Collection (z. B. List, Array, etc. in Java) durch die parallelize() Methode zu erzeugen.

RDDs bieten vielfältige Funktionen bzw. Methoden (detaillierte Informationen dazu findet man in einem früheren Blog-Post).

map() iteriert über jede Zeile in einem RDD als Input und gibt ein neues RDD als Output durch Anwendung einer gewünschten Modifikation mit gleicher Anzahl der Zeilen wie im Input zurück. count() gibt die Anzahl der Elemente in einem RDD zurück. union() schließt zwei RDDs zusammen und gibt sie als ein neues RDD zurück.

Außerdem unterstützt RDD zwei Arten von Operationen:

  • Transformation – Funktionen wie map(), die ein oder mehrere neue RDDs aus dem existierenden RDD erzeugen.
  • Action – Funktionen wie count(), die an existierenden RDDs für bestimmte Eigenschaften angewendet werden.

Ein Beispiel dazu in Java:

List<String> words = Arrays.asList("RDD", "DataFrame", "Dataset");
JavaRDD<String> inputWords = sc.parallelize(words);
inputWords.foreach(word -> System.out.println("word: " + word));

Ausgabe:

word: RDD    
word: DataFrame
word: Dataset

Außerdem ist hier noch anzumerken, dass bei Transformationen eine lazy evaluation angewendet wird. Lazy evaluation in Spark bedeutet, dass die Transformationen an einem RDD erst dann ausgeführt werden, wenn eine Action an diesem RDD ausgelöst wurde. D.h. eine Transformation wird nicht sofort an der Stelle ausgeführt, sobald sie definiert wurde, sondern sie wird ausgeführt, wenn das Ergebnis dieser Transformation in einer Action aufgerufen wird. Das führt zu effektiven Berechnungen von Transformationen.

RDD wird für folgende Fällen benutzt:

  • Low-level Kontrolle von Daten: RDDs kann man als eine Collection von Java-Objekten berücksichtigen. So ist es möglich, mit oben genannten Methoden jedes Objekt bzw. jede Zeile zu manipulieren. 
  • Unstrukturierte Daten: Daten, die schemafrei sind, wie z.B. Textdateien, können durch die RDD API verarbeitet werden.
  • Legacy Code: Da RDD eine alte Datenstruktur in Spark ist, können alte Anwendungen weiterhin mit der RDD-Datenstruktur gepflegt werden.
  • Spark GraphX: GraphX API ist eine Erweiterung der RDD API, so dient RDD hier als Grundbaustein.

Nachteile von RDDs:

  • Keine fortschrittlichen Optimierer von Spark mit strukturierten Daten: Query-Optimierung und Catalyst-Optimierung (definiert unten in DataFrame).
  • Keine Ableitung des Schemas für geladene strukturierte Daten im Vergleich zu DataFrames und Dataset in Spark.

Spark DataFrames

Ein DataFrame in Apache Spark ist eine verteilte Collection von Daten. Spark DataFrame enthält zwei APIs, nämlich die Spark DataFrame API und die SparkSQL API. Das ermöglicht den Entwicklern, relationale SQL-Abfragen auch direkt auszuführen. So können die strukturierten und semi-strukturierten Daten mit Spark DataFrame verarbeitet werden. Ein DataFrame in Spark kann man sich wie eine Tabelle in einer relationalen Datenbank, jedoch mit reichhaltigeren Optimierungen vorstellen. Spark DataFrame besteht aus Spalten, deren Namen den leichten Zugriff ermöglichen. Daten aus einer Vielzahl von Datenspeichern (HDFS, JSON, Hive, Parquet…) können als DataFrame geladen bzw.  gespeichert werden.

Folgendes Beispiel zeigt, wie man DataFrames in Spark benutzt:

Dataset<Row> df = sparkSession
                  .read()
	          .format("json")
	          .option("inferSchema", "true")
	          .option("header", "true")
	          .json("src/main/resources/person.json");
	
df.printSchema();

Ausgabe:

df.show();

Ausgabe:

df.select("name").show();

Ausgabe:

Beim Ausführen von DataFrames wird automatisch ein Query-Optimierer von Spark durchgeführt. Der Query-Optimierer verbessert die Performance von Spark durch das Umschreiben von relationalen SQL-Abfragen, sodass Query-Abfragen mit einem optimierten Execution-Plan ausgeführt werden.

Außerdem werden DataFrames von dem Catalyst-Optimierer unterstützt. Der Catalyst-Optimierer ist ein Prozess, der automatisch den effizientesten Plan zur Ausführung der von Entwicklern definierten Datenoperationen ermittelt.

Ein Nachteil von DataFrame in Spark ist, dass die Daten keine Typisierung beim Kompilieren aufweisen. So tauchen Fehlermeldungen bezüglich der Typisierung erst zur Laufzeit auf. Außerdem ist die direkte Verarbeitung von unstrukturierten Daten nicht möglich.

Spark Dataset

2015 wurde dieses oben genannte Problem der Typsicherheit durch die Implementierung von Spark Dataset beseitigt. Spark Dataset ist eine Erweiterung der Spark DataFrame API, die Typsicherheit und eine Schnittstelle für objektorientierte Programme unterstützt. Somit bietet Spark Dataset Typsicherung direkt beim Kompilieren an. Spark Dataset benutzt einen speziellen Encoder für die Serialisierung von Objekten, die über das Netzwerk verarbeitet oder übertragen werden.

Folgendes Beispiel zeigt eine Anwendung von Spark Dataset, um die gleichen Ergebnisse des oben gezeigten Beispiels mit Spark DataFrame zu erhalten. Eine Klasse Person mit Feldern wie in person.json als Encoder wurde erstellt.

public class Person implements Serializable {

	private String name;
	private String language;
	private int age;

	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getLanguage() {
		return language;
	}
	public void setLanguage(String language) {
		this.language = language;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
   }

Spark Dataset soll hier typisiert werden, sonst wird ein Analyse-Fehler verursacht. Ein Encoder ist nötig.

Dataset<Person> ds = sparkSession.read()
	                         .format("json")
	                         .option("inferSchema", "true")
	                         .option("header", "true")
	                         .json("src/main/resources/person.json")
	                         .as(Encoders.bean(Person.class));
	 
ds.printSchema();

Ausgabe:

ds.show();

Ausgabe:

ds.select("name").show();

Ausgabe:

DataFrame und Dataset in Spark werden benutzt für:

  • strukturierte Daten
  • mehr Optimierung und bessere Leistung
  • Verfügbarkeit praktischer Interfaces

Tabelle 1 fasst noch einmal die oben genannten Eigenschaften von Datenstrukturen in Spark zusammen.

Tabelle 1: Eigenschaften der Datenstrukturen in Spark

Fazit

Es wurden die drei Datenstrukturen in Spark kurz beschrieben und verglichen. Dabei wurde versucht, den Unterschied zwischen diesen Datenstrukturen mit Beispielcode zu erklären. Je nach Anwendungstyp werden heutzutage meistens DataFrame und Dataset in Spark dank der Eigenschaft Query-Optimization auf High-Level-Ebene benutzt, während RDD für Low-Level-Zwecke und Spark GraphX-Anwendungen angewendet wird. Auf der Homepage von Spark kann man sich noch weiter ausführlich informieren.

Short URL for this post: https://wp.me/p4nxik-3Bm
This entry was posted in Java Runtimes - VM, Appserver & Cloud and tagged , , , . Bookmark the permalink.

Leave a Reply