Apache Kafka in Java

Kafka ist eine schlanke Message Queue, deren Kernfeatures, im Vergleich zu etwa Apache ActiveMQ, besonders stark auf Skalierbarkeit, Performance und Fehlertoleranz ausgerichtet sind.

Nachdem in den letzten Artikeln zu Apache Kafka (1,2) bereits die Grundlagen und das grundlegende Setup einer Apache Kafka Instanz mit ZooKeeper auf der Konsole erläutert wurden, soll es hier um die Verwendung von Apache Kafka in Java gehen.

Zunächst wird die Kafka Dependency per Maven eingebunden:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.11.0.1</version>
</dependency>

Alternativ mit Gradle:

compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.11.0.1'

Textnachrichten verschicken

Beginnen wir mit einem einfachen Beispiel, bei dem Textnachrichten als Strings via Apache Kafka versandt und empfangen werden sollen.

Zunächst der TextProducer:

public void sendHelloWorld(){
  String topicName = "test";
  String message = "Hello World!";

  // create instance for properties to access producer configs
  Properties props = new Properties();

  // kafka server
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.177:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  Producer<String, String> producer = new KafkaProducer<>(props);

  ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
      topicName, "" + message.hashCode(), message);
  producer.send(producerRecord);
  producer.close();
}

Zunächst werden in den Zeilen 2 und 3 das Topic sowie die zu versendende Nachricht festgelegt. Anschließend wird in Zeile 6 ein Properties-Objekt initialisiert, welches in unserem Fall neben der Adresse des Kafkaservers (Zeile 9) nur Verweise auf einen StringSerializer für den Key und einen für den Value beinhaltet (Zeilen 11 und 12).

Im nächsten Schritt wird ein Producer erstellt. Da als Nachrichten immer Key-Value-Tupel gesendet werden, enthält dieser dementsprechend als Key-Typ und als Value-Typ Strings. Über den Key wird im Übrigen die Partition innerhalb des Topics ermittelt, während der Value den Eintrag selbst repräsentiert. Wenn kein Key angegeben wird, wird die Partition über Round Robin ausgewählt.

Anschließend wird ein ProducerRecord erstellt, welches eben dieses Key-Value-Tupel repräsentiert. Dieses Tupel wird mit dem Topic “test”, einem Key, welcher aus dem Hashcode der Nachricht als String besteht und dem Value “Hello World” initialisiert (Zeile 14).

Anschließend wird das Tupel über den oben erstellten Producer versendet (Zeile 16 bis 18) und der Producer wird beendet (Zeile 19).

Wenn wir nun in unsere Konsole mit dem Kafka-Consumer (siehe letzter Artikel) gehen, sehen wir die Textnachricht “Hello World” erscheinen.

Nun zum TextConsumer:

public void setupConsumeBlocking(String topicName) {
  // create instance for properties to access producer configs
  Properties props = new Properties();

  // kafka server
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.177:9092");

  // id of the consumer group
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringDeserializer");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
		"org.apache.kafka.common.serialization.StringDeserializer");

  // create consumer
  Consumer<String, String> consumer = new KafkaConsumer<>(props);
  // subscribe to topic
  consumer.subscribe(Arrays.asList(topicName));

  // endless polling, because push-model is not supported in Kafka
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    records.forEach(r -> System.out.println(r.value()));
  }
}

Hier wird der Name des zu konsumierenden Topics als Parameter topicName direkt an die Methode übergeben.

Zunächst wird wieder ein Property-Objekt initialisiert (Zeile 3) und mit den notwendigen Daten ausgestattet (Zeilen 6 ff.). Dabei muss hier eine Consumergroup-Id festgelegt werden, welche diesen Consumer einer Consumergroup zuordnet (siehe Grundlagen Artikel). Des Weiteren muss statt einem Key- und Value-Serializer hier ein Deserializer verwendet werden, da Nachrichten empfangen und nicht gesendet werden sollen.

Anschließend wird ein Consumer auf Basis dieser Properties erstellt (vgl. Zeile 16), welcher dann das als Parameter übergebene Topic abonniert (Zeile 18).

Anschließend wird in einer Endlosschleife gepollt, ob neue Nachrichten empfangen wurden (Zeile 22). Diese werden dann auf der Konsole ausgegeben (Zeile 23). Da Kafka kein Push-Modell unterstützt, kann dieses Polling leider nicht durch ein eventbasiertes System ersetzt werden.

Wenn dieser TextConsumer nun gestartet wird, beginnt er, alle Nachrichten, die ab dem Startzeitpunkt auf das Topic “test” gepublisht werden, auszugeben.

Objekte verschicken

Im nächsten Schritt möchten wir Objekte vom Typ SaleDTO an ein Kafka Topic schicken und von diesem empfangen. Die dazu erstellte Klasse SaleDTO ist ein Domainobjekt und erhält als Attribute eine ProductId als Integer, einen Produktnamen sowie einen Kundennamen als Stringobjekte und einen Preis als Double-Precision-Wert.

Die Sale-Objekte sollen hier als JSON-String an das Topic “sales” geschickt werden. Alternativ könnte man die Objekte auch direkt mit dem Java-Objekt-Serializer serialisieren. Dabei würde allerdings die Interoperabilität mit nicht-Java-Umgebungen verloren gehen, welche den Einsatz von Business-Intelligence-Tools unnötig erschweren würde.

Die Methode sendExampleSale() des SaleProducers sieht wie folgt aus:

public void sendExampleSale() {
 String topicName = "sales";
  
 //init test domain object
  SaleDTO sale = new SaleDTO();
  sale.setCustomerName("Steffen Jacobs");
  sale.setPrice((int) (Math.random() * 100 + .5) + .99);
  sale.setProductId((int) (Math.random() * 1000 + .5));
  sale.setProductName("Some Product Name");

  // create instance for properties to access producer configs
  Properties props = new Properties();

  // kafka server
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.177:9092");

  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "de.oio.kafka.sales.SalesSerializer");

  Producer<String, SaleDTO> producer = new KafkaProducer<>(props);

  ProducerRecord<String, SaleDTO> producerRecord = new ProducerRecord<>(topicName, sale);
  producer.send(producerRecord);
  producer.close();
}

Die Nachrichten sollen dieses Mal an das Topic “sales” verschickt werden (Zeile 2). Nachdem ein SaleDTO-Objekt generiert wurde (Zeilen 5-9) wird wieder ein Property-Objekt erstellt. Als ValueSerializer wird hier statt dem bereits bekannten StringSerializer ein eigener Serializer verwendet (Zeile 18). Anschließend wird ein Producer erstellt, der nun statt Strings SaleDTOs zurückgibt (Zeile 20). Danach folgt die Erstellung und der Versand eines ProducerRecords mit dem oben erstellten sale-Objekt (22 ff.). Hier wird auf einen expliziten Key verzichtet, weshalb die Nachrichten dann per Round Robin an die Partitionen verteilt werden.

Nun zum SalesSerializer. Diese Klasse implementiert das Serializer-Interface von Kafka mit dem Typen SaleDTO:

@Override
public byte[] serialize(String topic, SaleDTO data) {
  try {
    if (data == null) {
      return null;
    }
    else {
      Gson gson = new Gson();
      return gson.toJson(data).getBytes(encoding);
    }
  } catch (UnsupportedEncodingException e) {
	            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
  }
}

Die Klasse ist ähnlich des String-Serializers (org.apache.kafka.common.serialization.StringSerializer) aufgebaut, nur die serialize(…)-Methode wurde abgeändert und ist oben zu sehen. Hier wird die SaleDTO via Gson in ein JSON-Objekt im Stringformat konvertiert, welches dann wiederum als Byte-Array zurückgegeben wird.

Wenn der Producer nun ausgeführt wird, sehen wir im Konsolenfenster mit dem Kafka-Consumer die JSON-Objekte ankommen, etwa

{"productId":298,"customerName":"Steffen Jacobs","productName":"Some Product Name","Price":65.99}
{"productId":636,"customerName":"Steffen Jacobs","productName":"Some Product Name","Price":90.99}
{"productId":807,"customerName":"Steffen Jacobs","productName":"Some Product Name","Price":26.99}
{"productId":383,"customerName":"Steffen Jacobs","productName":"Some Product Name","Price":78.99}

Natürlich lassen sich diese JSON-Nachrichten auch in Java empfangen und wieder in ein SaleDTO zurückkonvertieren. Hier also der Sales-Consumer:

public void setupConsumeBlocking(String topicName) {
  // create instance for properties to access producer configs
  Properties props = new Properties();

  // kafka server
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.177:9092");

  // id of the consumer group
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "sales-group");

  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  		"org.apache.kafka.common.serialization.StringDeserializer");
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
		  "de.oio.kafka.sales.SalesDeserializer");

  // create consumer
  Consumer<String, SaleDTO> consumer = new KafkaConsumer<>(props);

  // subscribe to topic
  consumer.subscribe(Arrays.asList(topicName));

  // endless polling, because push-model is not supported in Kafka
  while (true) {
  	ConsumerRecords<String, SaleDTO> records = consumer.poll(100);
  	records.forEach(r -> System.out.println("Received Sale: " + r.value().toString()));
  }
}

Dieser ist ähnlich dem TextConsumer von oben aufgebaut. Nur wird statt dem StringDeserializer für den value hier der eigene SalesDeserializer verwendet, welcher aus dem Json-String mittels Gson wieder ein Objekt vom Typ SaleDTO herausparst. Dieses Objekt wird dann auf der Konsole ausgegeben.

Fazit

Es fällt auf, dass sich mit vergleichsweise wenig Boilerplatecode die Kernfunktionalität von Kafka bereits nutzen lässt. Das Framework ist leicht zu erlernen und bietet mit begrenztem Zeitaufwand bereits respektable Resultate. Auch die Modularisierung, beispielsweise über die selbst erstellbaren De-/Serialisierungsklassen reduziert die Komplexität. Gleichzeitig bietet Kafka aber eine umfangreiche Konfigurierbarkeit, wodurch auch komplexere Prozesse abgebildet werden können.

Short URL for this post: https://wp.me/p4nxik-2UC
This entry was posted in Java Runtimes - VM, Appserver & Cloud and tagged , , , , . Bookmark the permalink.

Leave a Reply