Integrate Kafka Connect Sources & Sinks

In the following getting started tutorial, we’ll focus on how to seamlessly integrate Kafka connect sources and sinks in logisland.

We can call this functionality Logisland connect.

Note

Be sure to know of to launch a logisland Docker environment by reading the prerequisites section

1. Logisland job setup

For this tutorial please make sure to already have installed elasticsearch and excel modules.

If not you can just do it through the components.sh command line:

bin/components.sh -i com.hurence.logisland:logisland-processor-elasticsearch:1.1.2

bin/components.sh -i com.hurence.logisland:logisland-service-elasticsearch_5_4_0-client:1.1.2

bin/components.sh -i com.github.jcustenborder.kafka.connect:kafka-connect-simulator:0.1.118

The logisland job for this tutorial is already packaged in the tar.gz assembly and you can find it here for ElasticSearch :

docker exec -i -t logisland vim conf/logisland-kafka-connect.yml

We will start by explaining each part of the config file.

The engine

The first section configures the Spark engine (we will use a KafkaStreamProcessingEngine) to run in local mode.

engine:
  component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
  type: engine
  documentation: Use Kafka connectors with logisland
  configuration:
    spark.app.name: LogislandConnect
    spark.master: local[2]
    spark.driver.memory: 1G
    spark.driver.cores: 1
    spark.executor.memory: 2G
    spark.executor.instances: 4
    spark.executor.cores: 2
    spark.yarn.queue: default
    spark.yarn.maxAppAttempts: 4
    spark.yarn.am.attemptFailuresValidityInterval: 1h
    spark.yarn.max.executor.failures: 20
    spark.yarn.executor.failuresValidityInterval: 1h
    spark.task.maxFailures: 8
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.streaming.batchDuration: 1000
    spark.streaming.backpressure.enabled: false
    spark.streaming.unpersist: false
    spark.streaming.blockInterval: 500
    spark.streaming.kafka.maxRatePerPartition: 3000
    spark.streaming.timeout: -1
    spark.streaming.unpersist: false
    spark.streaming.kafka.maxRetries: 3
    spark.streaming.ui.retainedBatches: 200
    spark.streaming.receiver.writeAheadLog.enable: false
    spark.ui.port: 4050

The controllerServiceConfigurations part is here to define all services that be shared by processors within the whole job.

The parsing stream

Here we are going to use a special processor (KafkaConnectStructuredSourceProviderService) to use the kafka connect source as input for the structured stream defined below.

For this example, we are going to use the source com.github.jcustenborder.kafka.connect.simulator.SimulatorSourceConnector that generates records containing fake personal data at rate of 100 messages/s.

# Our source service
- controllerService: kc_source_service
  component: com.hurence.logisland.stream.spark.provider.KafkaConnectStructuredSourceProviderService
  documentation: A kafka source connector provider reading from its own source and providing structured streaming to the underlying layer
  configuration:
    # We will use the logisland record converter for both key and value
    kc.data.value.converter: com.hurence.logisland.connect.converter.LogIslandRecordConverter
    # Use kryo to serialize the inner data
    kc.data.value.converter.properties: |
      record.serializer=com.hurence.logisland.serializer.KryoSerializer

    kc.data.key.converter: com.hurence.logisland.connect.converter.LogIslandRecordConverter
    # Use kryo to serialize the inner data
    kc.data.key.converter.properties: |
      record.serializer=com.hurence.logisland.serializer.KryoSerializer
    # Only one task to handle source input (unique)
    kc.worker.tasks.max: 1
    # The kafka source connector to wrap (here we're using a simulator source)
    kc.connector.class: com.github.jcustenborder.kafka.connect.simulator.SimulatorSourceConnector
    # The properties for the connector (as per connector documentation)
    kc.connector.properties: |
      key.schema.fields=email
      topic=simulator
      value.schema.fields=email,firstName,middleName,lastName,telephoneNumber,dateOfBirth
    # We are using a standalone source for testing. We can store processed offsets in memory
    kc.connector.offset.backing.store: memory

Note

The parameter kc.connector.properties contains the connector properties as you would have defined if you were using vanilla kafka connect.

As well, we are using a memory offset backing store. In a distributed scenario, you may have chosen a kafka topic based one.

Since each stream can be read and written, we are going to define as well a Kafka topic sink (KafkaStructuredStreamProviderService) that will be used as output for the structured stream defined below.

# Kafka sink configuration
- controllerService: kafka_out_service
  component: com.hurence.logisland.stream.spark.structured.provider.KafkaStructuredStreamProviderService
  configuration:
    kafka.output.topics: logisland_raw
    kafka.error.topics: logisland_errors
    kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 4
    kafka.topic.default.replicationFactor: 1

So that, we can now define the parsing stream using those source and sink

######### parsing stream ##############
- stream: parsing_stream_source
  component: com.hurence.logisland.stream.spark.structured.StructuredStream
  documentation: "Takes records from the kafka source and distributes related partitions over a kafka topic. Records are then handed off to the indexing stream"
  configuration:
    read.topics: /a/in
    read.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    read.topics.key.serializer: com.hurence.logisland.serializer.KryoSerializer
    read.stream.service.provider: kc_source_service
    write.topics: logisland_raw
    write.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    write.topics.key.serializer: com.hurence.logisland.serializer.KryoSerializer
    write.stream.service.provider: kafka_out_service

Within this stream, a FlatMap processor takes out the value and key (required when using StructuredStream as source of records)

processorConfigurations:
 - processor: flatten
   component: com.hurence.logisland.processor.FlatMap
   type: processor
   documentation: "Takes out data from record_value"
   configuration:
     keep.root.record: false
     copy.root.record.fields: true

The indexing stream

Inside this engine, you will run a Kafka stream of processing, so we set up input/output topics and Kafka/Zookeeper hosts. Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic.

Note

We want to specify an Avro output schema to validate our output records (and force their types accordingly). It’s really for other streams to rely on a schema when processing records from a topic.

We can define some serializers to marshall all records from and to a topic.

- stream: parsing_stream_source
  component: com.hurence.logisland.stream.spark.structured.StructuredStream
  documentation: "Takes records from the kafka source and distributes related partitions over a kafka topic. Records are then handed off to the indexing stream"
  configuration:
    read.topics: /a/in
    read.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    read.topics.key.serializer: com.hurence.logisland.serializer.KryoSerializer
    read.stream.service.provider: kc_source_service
    write.topics: logisland_raw
    write.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    write.topics.key.serializer: com.hurence.logisland.serializer.KryoSerializer
    write.stream.service.provider: kafka_out_service

Within this stream, a DebugStream processor takes a log line as a String and computes a Record as a sequence of fields.

processorConfigurations:
 # We just print the received records (but you may do something more interesting!)
 - processor: stream_debugger
   component: com.hurence.logisland.processor.DebugStream
   type: processor
   documentation: debug records
   configuration:
     event.serializer: json

This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will be printed in the console and pushed back to Kafka in the logisland_events topic.

2. Launch the script

Connect a shell to your logisland container to launch the following streaming jobs.

docker exec -i -t logisland bin/logisland.sh --conf conf/logisland-kafka-connect.yml

3. Examine your console output

Since we put a DebugStream processor, messages produced by our source connectors are then output to the console in json.

18/04/06 11:17:06 INFO DebugStream: {
 "id" : "9b17a9ac-97c4-44ef-9168-d298e8c53d42",
 "type" : "kafka_connect",
 "creationDate" : 1.4.106216376,
 "fields" : {
   "record_id" : "9b17a9ac-97c4-44ef-9168-d298e8c53d42",
   "firstName" : "London",
   "lastName" : "Marks",
   "telephoneNumber" : "005-694-4540",
   "record_key" : {
     "email" : "londonmarks@fake.com"
   },
   "middleName" : "Anna",
   "dateOfBirth" : 836179200000,
   "record_time" : 1.4.106216376,
   "record_type" : "kafka_connect",
   "email" : "londonmarks@fake.com"
 }
}

4. Monitor your spark jobs and Kafka topics

Now go to http://sandbox:4050/streaming/ to see how fast Spark can process your data

../_images/spark-job-monitoring.png

Another tool can help you to tweak and monitor your processing http://sandbox:9000/

../_images/kafka-mgr.png