Time series sampling & Outliers detection¶
In the following tutorial we’ll handle time series data from a sensor. We’ll see how sample the datapoints in a visually non destructive way and
We assume that you are already familiar with logisland platform and that you have successfully done the previous tutorials.
Note
You can download the latest release of logisland and the YAML configuration file for this tutorial which can be also found under $LOGISLAND_HOME/conf directory.
1. Setup the time series collection Stream¶
The first Stream use a KafkaRecordStreamParallelProcessing and chain of a SplitText
The first Processor simply parse the csv lines while the second index them into the search engine. Please note the output schema.
# parsing time series
- stream: parsing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that links
configuration:
kafka.input.topics: logisland_ts_raw
kafka.output.topics: logisland_ts_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: none
kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
avro.output.schema: >
{ "version":1,
"type": "record",
"name": "com.hurence.logisland.record.cpu_usage",
"fields": [
{ "name": "record_errors", "type": [ {"type": "array", "items": "string"},"null"] },
{ "name": "record_raw_key", "type": ["string","null"] },
{ "name": "record_raw_value", "type": ["string","null"] },
{ "name": "record_id", "type": ["string"] },
{ "name": "record_time", "type": ["long"] },
{ "name": "record_type", "type": ["string"] },
{ "name": "record_value", "type": ["string","null"] } ]}
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
processorConfigurations:
- processor: apache_parser
component: com.hurence.logisland.processor.SplitText
type: parser
documentation: a parser that produce events from an apache log REGEX
configuration:
record.type: apache_log
value.regex: (\S+),(\S+)
value.fields: record_time,record_value
2. Setup the Outliers detection Stream¶
The first Stream use a KafkaRecordStreamParallelProcessing and a DetectOutliers Processor
Note
It’s important to see that we perform outliers detection in parallel.
So if we would perform this detection for a particular grouping of record we would have used
a KafkaRecordStreamSQLAggregator with a GROUP BY
clause instead.
# detect outliers
- stream: detect_outliers
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that match query in parrallel
configuration:
kafka.input.topics: logisland_sensor_events
kafka.output.topics: logisland_sensor_outliers_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 2
kafka.topic.default.replicationFactor: 1
processorConfigurations:
- processor: match_query
component: com.hurence.logisland.processor.DetectOutliers
type: processor
documentation: a processor that detection something exotic in a continuous time series values
configuration:
rotation.policy.type: by_amount
rotation.policy.amount: 100
rotation.policy.unit: points
chunking.policy.type: by_amount
chunking.policy.amount: 10
chunking.policy.unit: points
global.statistics.min: -100000
min.amount.to.predict: 100
zscore.cutoffs.normal: 3.5
zscore.cutoffs.moderate: 5
record.value.field: record_value
record.time.field: record_time
output.record.type: sensor_outlier
3. Setup the time series Sampling Stream¶
The first Stream use a KafkaRecordStreamParallelProcessing and a RecordSampler Processor
# sample time series
- stream: detect_outliers
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that match query in parrallel
configuration:
kafka.input.topics: logisland_sensor_events
kafka.output.topics: logisland_sensor_sampled_events
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 2
kafka.topic.default.replicationFactor: 1
processorConfigurations:
- processor: sampler
component: com.hurence.logisland.processor.SampleRecords
type: processor
documentation: a processor that reduce the number of time series values
configuration:
record.value.field: record_value
record.time.field: record_time
sampling.algorithm: average
sampling.parameter: 10
4. Setup the indexing Stream¶
The last Stream use a KafkaRecordStreamParallelProcessing and chain of a SplitText and a BulkAddElasticsearch for indexing the whole records
# index records
- stream: indexing_stream
component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
type: stream
documentation: a processor that links
configuration:
kafka.input.topics: logisland_sensor_events,logisland_sensor_outliers_events,logisland_sensor_sampled_events
kafka.output.topics: none
kafka.error.topics: logisland_errors
kafka.input.topics.serializer: none
kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
kafka.output.topics.serializer: none
kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
kafka.metadata.broker.list: sandbox:9092
kafka.zookeeper.quorum: sandbox:2181
kafka.topic.autoCreate: true
kafka.topic.default.partitions: 4
kafka.topic.default.replicationFactor: 1
processorConfigurations:
- processor: es_publisher
component: com.hurence.logisland.processor.elasticsearch.BulkAddElasticsearch
type: processor
documentation: a processor that trace the processed events
configuration:
elasticsearch.client.service: elasticsearch_service
default.index: logisland
default.type: event
timebased.index: yesterday
es.index.field: search_index
es.type.field: record_type
4. Start logisland application¶
Connect a shell to your logisland container to launch the following stream processing job previously defined.
docker exec -ti logisland bash
#launch logisland streams
cd $LOGISLAND_HOME
bin/logisland.sh --conf conf/outlier-detection.yml
# send logs to kafka
cat cpu_utilization_asg_misconfiguration.csv | kafkacat -b sandbox:9092 -P -t logisland_sensor_raw