Store Apache logs to Redis K/V store

In the following getting started tutorial we’ll drive you through the process of Apache log mining with LogIsland platform.

Note

Be sure to know of to launch a logisland Docker environment by reading the prerequisites section

Note, it is possible to store data in different datastores. In this tutorial, we will see the case of Redis, if you need more in-depth explanations you can read the previous tutorial on indexing apache logs to elasticsearch or solr : `index-apache-logs.html`_ .

1. Logisland job setup

The logisland job for this tutorial is already packaged in the tar.gz assembly and you can find it here :

docker exec -i -t logisland vim conf/store-to-redis.yml

We will start by explaining each part of the config file.

The controllerServiceConfigurations part is here to define all services that be shared by processors within the whole job, here a Redis KV cache service that will be used later in the BulkPut processor.

- controllerService: datastore_service
  component: com.hurence.logisland.redis.service.RedisKeyValueCacheService
  type: service
  documentation: redis datastore service
  configuration:
    connection.string: localhost:6379
    redis.mode: standalone
    database.index: 0
    communication.timeout: 10 seconds
    pool.max.total: 8
    pool.max.idle: 8
    pool.min.idle: 0
    pool.block.when.exhausted: true
    pool.max.wait.time: 10 seconds
    pool.min.evictable.idle.time: 60 seconds
    pool.time.between.eviction.runs: 30 seconds
    pool.num.tests.per.eviction.run: -1
    pool.test.on.create: false
    pool.test.on.borrow: false
    pool.test.on.return: false
    pool.test.while.idle: true
    record.recordSerializer: com.hurence.logisland.serializer.JsonSerializer

Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic.

Note

We want to specify an Avro output schema to validate our ouput records (and force their types accordingly). It’s really for other streams to rely on a schema when processing records from a topic.

We can define some serializers to marshall all records from and to a topic.

- stream: parsing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: stream
  documentation: a processor that converts raw apache logs into structured log records
  configuration:
    kafka.input.topics: logisland_raw
    kafka.output.topics: logisland_events
    kafka.error.topics: logisland_errors
    kafka.input.topics.serializer: none
    kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 4
    kafka.topic.default.replicationFactor: 1

Within this stream a SplitText processor takes a log line as a String and computes a Record as a sequence of fields.

# parse apache logs
- processor: apache_parser
  component: com.hurence.logisland.processor.SplitText
  type: parser
  documentation: a parser that produce events from an apache log REGEX
  configuration:
    value.regex: (\S+)\s+(\S+)\s+(\S+)\s+\[([\w:\/]+\s[+\-]\d{4})\]\s+"(\S+)\s+(\S+)\s*(\S*)"\s+(\S+)\s+(\S+)
    value.fields: src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out

This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will be parsed as an event which will be pushed back to Kafka in the logisland_events topic.

The second processor will handle Records produced by the SplitText to index them into datastore previously defined (Redis)

# all the parsed records are added to datastore by bulk
- processor: datastore_publisher
  component: com.hurence.logisland.processor.datastore.BulkPut
  type: processor
  documentation: "indexes processed events in datastore"
  configuration:
    datastore.client.service: datastore_service

2. Launch the script

For this tutorial we will handle some apache logs with a splitText parser and send them to Redis Connect a shell to your logisland container to launch the following streaming jobs.

For ElasticSearch :

docker exec -i -t logisland bin/logisland.sh --conf conf/store-to-redis.yml

3. Inject some Apache logs into the system

Now we’re going to send some logs to logisland_raw Kafka topic.

We could setup a logstash or flume agent to load some apache logs into a kafka topic but there’s a super useful tool in the Kafka ecosystem : kafkacat, a generic command line non-JVM Apache Kafka producer and consumer which can be easily installed.

If you don’t have your own httpd logs available, you can use some freely available log files from NASA-HTTP web site access:

Let’s send the first 500000 lines of NASA http access over July 1995 to LogIsland with kafkacat to logisland_raw Kafka topic

cd /tmp
wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
gunzip NASA_access_log_Jul95.gz
head -500000 NASA_access_log_Jul95 | kafkacat -b sandbox:9092 -t logisland_raw

4. Inspect the logs

For this part of the tutorial we will use redis-py a Python client for Redis. You can install it by following instructions given on redis-py.

To install redis-py, simply:

$ sudo pip install redis

Getting Started, check if you can connect with Redis

>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
>>> r.get('foo')

Then we want to grab some logs that have been collected to Redis. We first find some keys with a pattern and get the json content of one

>>> r.keys('1234*')

[‘123493eb-93df-4e57-a1c1-4a8e844fa92c’, ‘123457d5-8ccc-4f0f-b4ba-d70967aa48eb’, ‘12345e06-6d72-4ce8-8254-a7cc4bab5e31’]

>>> r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c')

‘{n “id” : “123493eb-93df-4e57-a1c1-4a8e844fa92c”,n “type” : “apache_log”,n “creationDate” : 804574829000,n “fields” : {n “src_ip” : “204.191.209.4”,n “record_id” : “123493eb-93df-4e57-a1c1-4a8e844fa92c”,n “http_method” : “GET”,n “http_query” : “/images/WORLD-logosmall.gif”,n “bytes_out” : “669”,n “identd” : “-“,n “http_version” : “HTTP/1.0”,n “record_raw_value” : “204.191.209.4 - - [01/Jul/1995:01:00:29 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 200 669”,n “http_status” : “200”,n “record_time” : 804574829000,n “user” : “-“,n “record_type” : “apache_log”n }n}’

>>> import json
>>> record = json.loads(r.get('123493eb-93df-4e57-a1c1-4a8e844fa92c'))
>>> record['fields']['bytes_out']