Generate Unique Ids

We will add a stage to the “index-apache-logs” tutorial. We will ensure every Record has a unique Id before injecting into Es. This way we are sure to not have documentAlreadyException or to have two records that overwrite themselves.

Note

If you are not familiar with logisland yet. You should really read “index-apache-logs” tutorial before this one.

We assume we are at the stage just before injecting apache logs into ES from “index-apache-logs”

Stream 1 : parse incoming apache log lines

Inside this engine you will run a Kafka stream of processing, so we setup input/output topics and Kafka/Zookeeper hosts. Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic.

Note

We want to specify an Avro output schema to validate our ouput records (and force their types accordingly). It’s really for other streams to rely on a schema when processing records from a topic.

We can define some serializers to marshall all records from and to a topic.

# parsing
- stream: parsing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: stream
  documentation: a processor that links
  configuration:
    kafka.input.topics: logisland_raw
    kafka.output.topics: logisland_events
    kafka.error.topics: logisland_errors
    kafka.input.topics.serializer: none
    kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    avro.output.schema: >
      {  "version":1,
         "type": "record",
         "name": "com.hurence.logisland.record.apache_log",
         "fields": [
           { "name": "record_errors",   "type": [ {"type": "array", "items": "string"},"null"] },
           { "name": "record_raw_key", "type": ["string","null"] },
           { "name": "record_raw_value", "type": ["string","null"] },
           { "name": "record_id",   "type": ["string"] },
           { "name": "record_time", "type": ["long"] },
           { "name": "record_type", "type": ["string"] },
           { "name": "src_ip",      "type": ["string","null"] },
           { "name": "http_method", "type": ["string","null"] },
           { "name": "bytes_out",   "type": ["long","null"] },
           { "name": "http_query",  "type": ["string","null"] },
           { "name": "http_version","type": ["string","null"] },
           { "name": "http_status", "type": ["string","null"] },
           { "name": "identd",      "type": ["string","null"] },
           { "name": "user",        "type": ["string","null"] }    ]}
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 4
    kafka.topic.default.replicationFactor: 1
  processorConfigurations:

Within this stream a SplitText processor takes a log line as a String and computes a Record as a sequence of fields.

# parse apache logs
- processor: apache_parser
  component: com.hurence.logisland.processor.SplitText
  type: parser
  documentation: a parser that produce events from an apache log REGEX
  configuration:
    value.regex: (\S+)\s+(\S+)\s+(\S+)\s+\[([\w:\/]+\s[+\-]\d{4})\]\s+"(\S+)\s+(\S+)\s*(\S*)"\s+(\S+)\s+(\S+)
    value.fields: src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out

Within this stream a ModifyId processor takes Record ouput from SplitText processor and computes a new Id for them using the value of their field “record_raw_value” that should content the original line string of the apache log. It will hash it using “SHA-256” java implementation algorithm, using the charset “UTF-8”.

# parse apache logs - processor: apache_parser

component: com.hurence.logisland.processor.ModifyId type: parser documentation: a parser that modify record Ids configuration:

id.generation.strategy: hashFields hash.charset: UTF-8 fields.to.hash: record_raw_value hash.algorithm: SHA-256

This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will be parsed as an event which will be pushed back to Kafka in the logisland_events topic.

Then you can process to your indexation in Elasticsearch as in “index-apache-logs” example.