Generate Unique Ids¶
We will add a stage to the “index-apache-logs” tutorial. We will ensure every Record has a unique Id before injecting into Es. This way we are sure to not have documentAlreadyException or to have two records that overwrite themselves.
Note
If you are not familiar with logisland yet. You should really read “index-apache-logs” tutorial before this one.
We assume we are at the stage just before injecting apache logs into ES from “index-apache-logs”
Stream 1 : parse incoming apache log lines¶
Inside this engine you will run a Kafka stream of processing, so we setup input/output topics and Kafka/Zookeeper hosts.
Here the stream will read all the logs sent in logisland_raw
topic and push the processing output into logisland_events
topic.
Note
We want to specify an Avro output schema to validate our ouput records (and force their types accordingly). It’s really for other streams to rely on a schema when processing records from a topic.
We can define some serializers to marshall all records from and to a topic.
# parsing
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that links
configuration:
kafka.input.topics: logisland_raw
kafka.output.topics: logisland_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: none
kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
avro.output.schema: >
{ "version":1,
"type": "record",
"name": "com.hurence.logisland.record.apache_log",
"fields": [
{ "name": "record_errors", "type": [ {"type": "array", "items": "string"},"null"] },
{ "name": "record_raw_key", "type": ["string","null"] },
{ "name": "record_raw_value", "type": ["string","null"] },
{ "name": "record_id", "type": ["string"] },
{ "name": "record_time", "type": ["long"] },
{ "name": "record_type", "type": ["string"] },
{ "name": "src_ip", "type": ["string","null"] },
{ "name": "http_method", "type": ["string","null"] },
{ "name": "bytes_out", "type": ["long","null"] },
{ "name": "http_query", "type": ["string","null"] },
{ "name": "http_version","type": ["string","null"] },
{ "name": "http_status", "type": ["string","null"] },
{ "name": "identd", "type": ["string","null"] },
{ "name": "user", "type": ["string","null"] } ]}
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
processorConfigurations:
Within this stream a SplitText
processor takes a log line as a String and computes a Record
as a sequence of fields.
# parse apache logs
- processor: apache_parser
component: com.hurence.logisland.processor.SplitText
type: parser
documentation: a parser that produce events from an apache log REGEX
configuration:
value.regex: (\S+)\s+(\S+)\s+(\S+)\s+\[([\w:\/]+\s[+\-]\d{4})\]\s+"(\S+)\s+(\S+)\s*(\S*)"\s+(\S+)\s+(\S+)
value.fields: src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out
Within this stream a ModifyId processor takes Record ouput from SplitText processor and computes a new Id for them using the value of their field “record_raw_value” that should content the original line string of the apache log. It will hash it using “SHA-256” java implementation algorithm, using the charset “UTF-8”.
# parse apache logs - processor: apache_parser
component: com.hurence.logisland.processor.ModifyId type: parser documentation: a parser that modify record Ids configuration:
id.generation.strategy: hashFields hash.charset: UTF-8 fields.to.hash: record_raw_value hash.algorithm: SHA-256
This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will
be parsed as an event which will be pushed back to Kafka in the logisland_events
topic.
Then you can process to your indexation in Elasticsearch as in “index-apache-logs” example.