Capturing Network packets in Logisland

1. Network Packets

A network packet is a formatted unit of data carried by a network from one computer (or device) to another. For example, a web page or an email are carried as a series of packets of a certain size in bytes. Each packet carries the information that will help it get to its destination : the sender’s IP address, the intended receiver’s IP address, something that tells the network how many packets the message has been broken into, …

Packet Headers

1. Protocol headers (IP, TCP, …)

This information is stored in different layers called “headers”, encapsulating the packet payload. For example, a TCP/IP packet is wrapped in a TCP header, which is in turn encapsulated in an IP header.

The individual packets for a given file or message may travel different routes through the Internet. When they have all arrived, they are reassembled by the TCP layer at the receiving end.

2. PCAP format specific headers

Packets can be either analysed in real-time (stream mode) or stored in files for upcoming analysis (batch mode). In the latter case, the packets are stored in the pcap format, adding some specific headers :

In this tutorial we are going to capture packets in live stream mode

Why capturing network packets ?

Packet sniffing, or packet analysis, is the process of capturing any data transmitted over the local network and searching for any information that may be useful for :

  • Troubleshooting network problems
  • Detecting network intrusion attempts
  • Detecting network misuse by internal and external users
  • Monitoring network bandwidth utilization
  • Monitoring network and endpoint security status
  • Gathering and report network statistics

Packets information collected by Logisland

LogIsland parses all the fields of IP protocol headers, namely :

1. IP Header fields

  • IP version : ip_version
  • Internet Header Length : ip_internet_header_length
  • Type of Service : ip_type_of_service
  • Datagram Total Length : ip_datagram_total_length
  • Identification : ip_identification
  • Flags : ip_flags
  • Fragment offset : ip_fragment_offset
  • Time To Live : ip_time_to_live
  • Protocol : protocol
  • Header Checksum : ip_checksum
  • Source IP address : src_ip
  • Destination IP address : dst_ip
  • Options : ip_options (variable size)
  • Padding : ip_padding (variable size)

2. TCP Header fields

  • Source port number : src_port
  • Destination port number : dest_port
  • Sequence Number : tcp_sequence_number
  • Acknowledgment Number : tcp_acknowledgment_number
  • Data offset : tcp_data_offset
  • Flags : tcp_flags
  • Window size : tcp_window_size
  • Checksum : tcp_checksum
  • Urgent Pointer : tcp_urgent_pointer
  • Options : tcp_options (variable size)
  • Padding : tcp_padding (variable size)

3. UDP Header fields

  • Source port number : src_port
  • Destination port number : dest_port
  • Segment total length : udp_segment_total_length
  • Checksum : udp_checksum

2. Tutorial environment

This tutorial aims to show how to capture live Network Packets and process then in LogIsland. Through its out-of-the-box ParseNetworkPacket processor, LogIsland is able to receive and handle network packets captured by a packet sniffer tool. Using LogIsland, you will be able to inspect those packets for network security, optimization or monitoring reasons.

In this tutorial, we will show you how to capture network packets, process those packets in LogIsland, index them in ElasticSearch and then display them in Kibana.

We will launch two streaming processors, one for parsing Network Packets into LogIsland packet records, and one to index those packet records in ElasticSearch.

Packet Capture Tool

For the purpose of this tutorial, we are going to capture network packets (off-the-wire) into a kafka topic using pycapa Apache probe, a tool based on Pcapy, a Python extension module that interfaces with the libpcap packet capture library.

For information, it is also possible to use the fastcapa Apache probe, based on DPDK, intended for high-volume packet capture.

Note

You can download the latest release of LogIsland and the YAML configuration file for this tutorial which can be also found under $LOGISLAND_HOME/conf directory in the LogIsland container.

3. Start LogIsland as a Docker container

LogIsland is packaged as a Docker container that you can build yourself or pull from Docker Hub. The docker container is built from a Centos 6.4 image with the following tools enabled (among others)

  • Kafka
  • Spark
  • Elasticsearch
  • Kibana
  • LogIsland

Pull the image from Docker Repository (it may take some time)

docker pull hurence/logisland

You should be aware that this Docker container is quite eager in RAM and will need at least 8G of memory to run smoothly. Now run the container

# run container
docker run \
    -it \
    -p 80:80 \
    -p 8080:8080 \
    -p 3000:3000 \
    -p 9200-9300:9200-9300 \
    -p 5601:5601 \
    -p 2181:2181 \
    -p 9092:9092 \
    -p 9000:9000 \
    -p 4050-4060:4050-4060 \
    --name logisland \
    -h sandbox \
    hurence/logisland bash

# get container ip
docker inspect logisland

# or if your are on mac os
docker-machine ip default

you should add an entry for sandbox (with the container ip) in your /etc/hosts as it will be easier to access to all web services in logisland running container.

Note

If you have your own Spark and Kafka cluster, you can download the latest release and unzip on an edge node.

4. Parse Network Packets

In this tutorial we will capture network packets, process those packets in LogIsland and index them in ElasticSearch.

Connect a shell to your logisland container to launch LogIsland streaming jobs :

docker exec -ti logisland bash
cd $LOGISLAND_HOME
bin/logisland.sh --conf conf/index-network-packets.yml

Setup Spark/Kafka streaming engine

An Engine is needed to handle the stream processing. This conf/index-network-packets.yml configuration file defines a stream processing job setup. The first section configures the Spark engine, we will use a KafkaStreamProcessingEngine :

engine:
 component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
 type: engine
 documentation: Parse network packets with LogIsland
 configuration:
   spark.app.name: ParseNetworkPacketDemo
   spark.master: local[4]
   spark.driver.memory: 1G
   spark.driver.cores: 1
   spark.executor.memory: 2G
   spark.executor.instances: 4
   spark.executor.cores: 2
   spark.yarn.queue: default
   spark.yarn.maxAppAttempts: 4
   spark.yarn.am.attemptFailuresValidityInterval: 1h
   spark.yarn.max.executor.failures: 20
   spark.yarn.executor.failuresValidityInterval: 1h
   spark.task.maxFailures: 8
   spark.serializer: org.apache.spark.serializer.KryoSerializer
   spark.streaming.batchDuration: 4000
   spark.streaming.backpressure.enabled: false
   spark.streaming.unpersist: false
   spark.streaming.blockInterval: 500
   spark.streaming.kafka.maxRatePerPartition: 3000
   spark.streaming.timeout: -1
   spark.streaming.unpersist: false
   spark.streaming.kafka.maxRetries: 3
   spark.streaming.ui.retainedBatches: 200
   spark.streaming.receiver.writeAheadLog.enable: false
   spark.ui.port: 4050

 controllerServiceConfigurations:

   - controllerService: elasticsearch_service
     component: com.hurence.logisland.service.elasticsearch.Elasticsearch_2_4_0_ClientService
     type: service
     documentation: elasticsearch 2.4.0 service implementation
     configuration:
       hosts: sandbox:9300
       cluster.name: elasticsearch
       batch.size: 4000

 streamConfigurations:

Stream 1 : parse incoming Network Packets

Inside this engine you will run a Kafka stream of processing, so we setup input/output topics and Kafka/Zookeeper hosts. Here the stream will read all the logs sent in logisland_input_packets_topic topic and push the processed packet records into logisland_parsed_packets_topic topic.

We can define some serializers to marshall all records from and to a topic.

# Parsing
- stream: parsing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: stream
  documentation: A processor chain that parses network packets into Logisland records
  configuration:
    kafka.input.topics: logisland_input_packets_topic
    kafka.output.topics: logisland_parsed_packets_topic
    kafka.error.topics: logisland_error_packets_topic
    kafka.input.topics.serializer: com.hurence.logisland.serializer.BytesArraySerializer
    kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 2
    kafka.topic.default.replicationFactor: 1
  processorConfigurations:

Within this stream there is a single processor in the processor chain: the ParseNetworkPacket processor. It takes an incoming network packet, parses it and computes a LogIsland record as a sequence of fields corresponding to packet headers fields.

# Transform network packets into LogIsland packet records
- processor: ParseNetworkPacket processor
  component: com.hurence.logisland.processor.networkpacket.ParseNetworkPacket
  type: parser
  documentation: A processor that parses network packets into LogIsland records
  configuration:
    debug: true
    flow.mode: stream

This stream will process network packets as soon as they will be queued into logisland_input_packets_topic Kafka topic, each packet will be parsed as a record which will be pushed back to Kafka in the logisland_parsed_packets_topic topic.

Stream 2: Index the processed records into Elasticsearch

The second Kafka stream will handle Records pushed into the logisland_parsed_packets_topic topic to index them into ElasticSearch. So there is no need to define an output topic:

# Indexing
- stream: indexing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: processor
  documentation: a processor that pushes events to ES
  configuration:
    kafka.input.topics: logisland_parsed_packets_topic
    kafka.output.topics: none
    kafka.error.topics: logisland_error_packets_topic
    kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.output.topics.serializer: none
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 2
    kafka.topic.default.replicationFactor: 1
  processorConfigurations:

The only processor in the processor chain of this stream is the BulkAddElasticsearch processor.

# Bulk add into ElasticSearch
- processor: ES Publisher
  component: com.hurence.logisland.processor.elasticsearch.BulkAddElasticsearch
  type: processor
  documentation: A processor that pushes network packet records into ES
  configuration:
    elasticsearch.client.service: elasticsearch_service
    default.index: packets_index
    default.type: events
    timebased.index: today
    es.index.field: search_index
    es.type.field: record_type

The default.index: packets_index configuration parameter tells the elasticsearch processor to index records into an index starting with the packets_index string. The timebased.index: today configuration parameter tells the processor to use the current date after the index prefix. Thus the index name is of the form /packets_index.2017.03.30.

Finally, the es.type.field: record_type configuration parameter tells the processor to use the record field record_type of the incoming record to determine the ElasticSearch type to use within the index.

5. Stream network packets into the system

Let’s install and use the Apache pycapa probe to capture and send packets to kafka topics in real time.

Install pycapa probe

All required steps to install pycapa probe are explained in this site, but here are the main installation steps :

  1. Install libpcap, pip (python-pip) and python-devel :
yum install libpcap
yum install python-pip
yum install python-devel
  1. Build pycapa probe from Metron repo
cd /tmp
git clone https://github.com/apache/incubator-metron.git
cd incubator-metron/metron-sensors/pycapa
pip install -r requirements.txt
python setup.py install

Capture network packets

To start capturing network packets into the topic logisland_input_packets_topic using pycapa probe, use the following command :

pycapa --producer --kafka sandbox:9092 --topic logisland_input_packets_topic -i eth0

6. Monitor your spark jobs and Kafka topics

Now go to http://sandbox:4050/streaming/ to see how fast Spark can process your data

../_images/spark-streaming-packet-capture-job.png

7. Use Kibana to inspect records

Inspect Network Packets under Discover tab

Open your browser and go to http://sandbox:5601/

Configure a new index pattern with packets.* as the pattern name and @timestamp as the time value field.

../_images/kibana-configure-index-packet.png

Then browse “Discover” tab, you should be able to explore your network packet records :

../_images/kibana-logisland-metrics-packet-stream-pycapa.png