Netflow/Logisland integration - Handling Netflow traffic

Netflow and Logisland

Netflow is a feature introduced on Cisco routers that provides the ability to collect IP network traffic. We can distinguish 2 components:

  • Flow exporter: aggregates packets into flows and exports flow records (binary format) towards flow collectors
  • Flow collector: responsible for reception, storage and pre-processing of flow data received from a flow exporter

The collected data are therefore available for analysis purpose (intrusion detection, traffic analysis...)

Network Flows: A network flow can be defined in many ways. Cisco standard NetFlow version 5 defines a flow as a unidirectional sequence of packets that all share the following 7 values:

  1. Ingress interface (SNMP ifIndex)
  2. Source IP address
  3. Destination IP address
  4. IP protocol
  5. Source port for UDP or TCP, 0 for other protocols
  6. Destination port for UDP or TCP, type and code for ICMP, or 0 for other protocols
  7. IP Type of Service

NetFlow Record

A NetFlow record can contain a wide variety of information about the traffic in a given flow. NetFlow version 5 (one of the most commonly used versions, followed by version 9) contains the following:

  • Input interface index used by SNMP (ifIndex in IF-MIB).
  • Output interface index or zero if the packet is dropped.
  • Timestamps for the flow start and finish time, in milliseconds since the last boot.
  • Number of bytes and packets observed in the flow
  • Layer 3 headers:
    • Source & destination IP addresses
    • ICMP Type and Code.
    • IP protocol
    • Type of Service (ToS) value
  • Source and destination port numbers for TCP, UDP, SCTP
  • For TCP flows, the union of all TCP flags observed over the life of the flow.
  • Layer 3 Routing information:
    • IP address of the immediate next-hop (not the BGP nexthop) along the route to the destination
    • Source & destination IP masks (prefix lengths in the CIDR notation)

Through its out-of-the-box Netflow processor, Logisland integrates with Netflow (V5) and is able to receive and handle Netflow events coming from a netflow collector. By analyzing those events with Logisland, you may do some analysis for example for intrusion detection or traffic analysis.

In this tutorial, we will show you how to generate some Netflow traffic in LogIsland and how to index them in ElasticSearch and vizualize them in Kinbana. More complexe treatment could bv done by plugging any Logisland processors after the Netflow processor.

Tutorial environment

This tutorial aims to show how to handle Netflow traffic within LogIsland.

For the purpose of this tutorial, we will generate Netflow traffic using nfgen. This tool will simulate a netflow traffic and send binary netflow records on port 2055 of sandbox. A nifi instance running on sandbox will listen on that port for incoming traffic and push the binary events to a kafka broker.

We will launch two streaming processes, one for generating the corresponding Netflow LogIsland records and the second one to index them in ElasticSearch.

Note

It is important to understand that in real environment Netflow traffic will be triggered by network devices (router, switches,...), so you will have to get the netflow traffic from the defined collectors, and send the corresponding record (formatted in JSON format as described before) to the Logisland service (Kafka).

Note

You can download the latest release of Logisland and the YAML configuration file for this tutorial which can also be found under $LOGISLAND_HOME/conf directory in the LogIsland container.

1. Start LogIsland as a Docker container

LogIsland is packaged as a Docker container that you can build yourself or pull from Docker Hub. The docker container is built from a Centos 6.4 image with the following tools enabled (among others)

  • Kafka
  • Spark
  • Elasticsearch
  • Kibana
  • LogIsland

Pull the image from Docker Repository (it may take some time)

docker pull hurence/logisland

You should be aware that this Docker container is quite eager in RAM and will need at least 8G of memory to run smoothly. Now run the container

# run container
docker run \
    -it \
    -p 80:80 \
    -p 8080:8080 \
    -p 2055:2055 \
    -p 3000:3000 \
    -p 9200-9300:9200-9300 \
    -p 5601:5601 \
    -p 2181:2181 \
    -p 9092:9092 \
    -p 9000:9000 \
    -p 4050-4060:4050-4060 \
    --name logisland \
    -h sandbox \
    hurence/logisland bash

# get container ip
docker inspect logisland

# or if your are on mac os
docker-machine ip default

you should add an entry for sandbox (with the container ip) in your /etc/hosts as it will be easier to access to all web services in logisland running container.

Note

If you have your own Spark and Kafka cluster, you can download the latest release and unzip on an edge node.

2. Configuration steps

First we have to peform some configuration steps on sandbox (to configure and start elasticsearch and nifi). We will create a dynamic template in ElasticSearch (to better handle the field mapping) using the following command:

docker exec -ti logisland bash

[root@sandbox /]# curl -XPUT localhost:9200/_template/netflow -d '{
  "template" : "netflow.*",
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings" : {
    "netflowevent" : {
      "numeric_detection": true,
      "_all" : {"enabled" : false},
      "properties" : {
        "dOctets": {"index": "analyzed", "type": "long" },
        "dPkts": { "index": "analyzed", "type": "long" },
        "dst_as": { "index": "analyzed", "type": "long" },
        "dst_mask": { "index": "analyzed", "type": "long" },
        "dst_ip4": { "index": "analyzed", "type": "ip" },
        "dst_port": { "index": "analyzed", "type": "long" },
        "first":{"index": "analyzed", "type": "long" },
        "input":{"index": "analyzed", "type": "long" },
        "last":{"index": "analyzed", "type": "long" },
        "nexthop":{"index": "analyzed", "type": "ip" },
        "output":{"index": "analyzed", "type": "long" },
        "nprot":{"index": "analyzed", "type": "long" },
        "record_time":{"index": "analyzed", "type": "date","format": "strict_date_optional_time||epoch_millis" },
        "src_as":{"index": "analyzed", "type": "long" },
        "src_mask":{"index": "analyzed", "type": "long" },
        "src_ip4": { "index": "analyzed", "type": "ip" },
        "src_port":{"index": "analyzed", "type": "long" },
        "flags":{"index": "analyzed", "type": "long" },
        "tos":{"index": "analyzed", "type": "long" },
        "unix_nsecs":{"index": "analyzed", "type": "long" },
        "unix_secs":{"index": "analyzed", "type": "date","format": "strict_date_optional_time||epoch_second" }
      }
    }
  }
}'

In order to send netflow V5 event (binary format) to logisland_raw Kafka topic, we will use a nifi instance which will simply listen for netflow traffic on a UDP port (we keep here the default netflow port 2055) and push these netflow records to a kafka broker (sandbox:9092 with topic netflow).

  1. Start nifi

    docker exec -ti logisland bash
    cd /usr/local/nifi-1.1.1
    bin/nifi.sh start
    

    browse http://sandbox:8080/nifi/

  2. Import flow template

    Download this nifi template and import it using “Upload Template” in “Operator” toolbox.

    ../_images/nifi-template-dialog.png
  3. Use this template to create the nifi flow

    Drag the nifi toolbar template icon in the nifi work area and choose “nifi_netflow” template, the press “ADD” button

../_images/nifi-drag-template.png

You finally have the following nifi flow

../_images/nifi-flow.png
  1. start nifi processors

Select listenUDP processor of nifi flow, right click on it and press “Start”. Do the same for putKafka processor.

Note

the PutFile processor is only for debugging purpose. It dumps netflow records to /tmp/netflow directory (that should be previously created). So you normally don’t have to start it for that demo.

3. Parse Netflow records

For this tutorial we will handle netflow binary events, generate corresponding logisland records and store them to Elastiscearch

Connect a shell to your logisland container to launch the following streaming jobs.

docker exec -ti logisland bash
cd $LOGISLAND_HOME
bin/logisland.sh --conf conf/index-netflow-events.yml

Setup Spark/Kafka streaming engine

An Engine is needed to handle the stream processing. This conf/index-netflow-events.yml configuration file defines a stream processing job setup. The first section configures the Spark engine (we will use a KafkaStreamProcessingEngine) as well as an Elasticsearch service that will be used later in the BulkAddElasticsearch processor.

engine:
 component: com.hurence.logisland.engine.spark.KafkaStreamProcessingEngine
 type: engine
 documentation: Index Netflow events with LogIsland
 configuration:
   spark.app.name: IndexNetFlowEventsDemo
   spark.master: local[4]
   spark.driver.memory: 1G
   spark.driver.cores: 1
   spark.executor.memory: 2G
   spark.executor.instances: 4
   spark.executor.cores: 2
   spark.yarn.queue: default
   spark.yarn.maxAppAttempts: 4
   spark.yarn.am.attemptFailuresValidityInterval: 1h
   spark.yarn.max.executor.failures: 20
   spark.yarn.executor.failuresValidityInterval: 1h
   spark.task.maxFailures: 8
   spark.serializer: org.apache.spark.serializer.KryoSerializer
   spark.streaming.batchDuration: 4000
   spark.streaming.backpressure.enabled: false
   spark.streaming.unpersist: false
   spark.streaming.blockInterval: 500
   spark.streaming.kafka.maxRatePerPartition: 3000
   spark.streaming.timeout: -1
   spark.streaming.unpersist: false
   spark.streaming.kafka.maxRetries: 3
   spark.streaming.ui.retainedBatches: 200
   spark.streaming.receiver.writeAheadLog.enable: false
   spark.ui.port: 4050

 controllerServiceConfigurations:

   - controllerService: elasticsearch_service
     component: com.hurence.logisland.service.elasticsearch.Elasticsearch_2_4_0_ClientService
     type: service
     documentation: elasticsearch 2.4.0 service implementation
     configuration:
       hosts: sandbox:9300
       cluster.name: elasticsearch
       batch.size: 20000

 streamConfigurations:

Stream 1 : parse incoming Netflow (Binary format) lines

Inside this engine you will run a Kafka stream of processing, so we setup input/output topics and Kafka/Zookeeper hosts. Here the stream will read all the logs sent in logisland_raw topic and push the processing output into logisland_events topic.

We can define some serializers to marshall all records from and to a topic.

# Parsing
- stream: parsing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: stream
  documentation: A processor chain that transforms Netflow events into Logisland records
  configuration:
    kafka.input.topics: netflow
    kafka.output.topics: logisland_events
    kafka.error.topics: logisland_errors
    kafka.input.topics.serializer: none
    kafka.output.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 2
    kafka.topic.default.replicationFactor: 2
  processorConfigurations:

Within this stream there is a single processor in the processor chain: the Netflow processor. It takes an incoming Netflow event/notice binary record, parses it and computes a Logisland Record as a sequence of fields that were contained in the binary record.

# Transform Netflow events into Logisland records
     - processor: Netflow adaptor
       component: com.hurence.logisland.processor.netflow.ParseNetflowEvent
       type: parser
       documentation: A processor that transforms Netflow events into LogIsland events
       configuration:
         debug: false
         enrich.record: false

This stream will process log entries as soon as they will be queued into logisland_raw Kafka topics, each log will be parsed as an event which will be pushed back to Kafka in the logisland_events topic.

Stream 2: Index the processed records into Elasticsearch

The second Kafka stream will handle Records pushed into the logisland_events topic to index them into ElasticSearch. So there is no need to define an output topic:

# Indexing
- stream: indexing_stream
  component: com.hurence.logisland.stream.spark.KafkaRecordStreamParallelProcessing
  type: processor
  documentation: A processor chain that pushes netflow events to ES
  configuration:
    kafka.input.topics: logisland_events
    kafka.output.topics: none
    kafka.error.topics: logisland_errors
    kafka.input.topics.serializer: com.hurence.logisland.serializer.KryoSerializer
    kafka.output.topics.serializer: none
    kafka.error.topics.serializer: com.hurence.logisland.serializer.JsonSerializer
    kafka.metadata.broker.list: sandbox:9092
    kafka.zookeeper.quorum: sandbox:2181
    kafka.topic.autoCreate: true
    kafka.topic.default.partitions: 2
    kafka.topic.default.replicationFactor: 1
  processorConfigurations:

The only processor in the processor chain of this stream is the BulkAddElasticsearch processor.

# Bulk add into ElasticSearch
- processor: ES Publisher
  component: com.hurence.logisland.processor.elasticsearch.BulkAddElasticsearch
  type: processor
  documentation: A processor that pushes Netflow events into ES
  configuration:
    elasticsearch.client.service: elasticsearch_service
    default.index: netflow
    default.type: events
    timebased.index: today
    es.index.field: search_index
    es.type.field: record_type

The default.index: netflow configuration parameter tells the processor to index events into an index starting with the netflow string. The timebased.index: today configuration parameter tells the processor to use the current date after the index prefix. Thus the index name is of the form /netflow.2017.03.30.

Finally, the es.type.field: record_type configuration parameter tells the processor to use the record field record_type of the incoming record to determine the ElasticSearch type to use within the index.

4. Inject Netflow events into the system

Generate Netflow events to port 2055 of localhost

Now that we have our nifi flow listening on port 2055 from Netflow (V5) traffic and push them to kafka, we have to generate netflow traffic. For the purpose of this tutorial, as already mentioned, we will install and use a netflow traffic generator (but you can use whatever other way to generate Netflow V5 traffic to port 2055)

docker exec -ti logisland bash
cd /tmp
wget https://github.com/pazdera/NetFlow-Exporter-Simulator/archive/master.zip
unzip master.zip
cd NetFlow-Exporter-Simulator-master/
make
./nfgen   #this command will generate netflow V5 traffic and send it to local port 2055.

5. Monitor your spark jobs and Kafka topics

Now go to http://sandbox:4050/streaming/ to see how fast Spark can process your data

../_images/spark-job-monitoring.png

6. Use Kibana to inspect events

Inspect Netflow events under Discover tab

Open your browser and go to http://sandbox:5601/

Configure a new index pattern with netflow.* as the pattern name and @timestamp as the time value field.

../_images/kibana-configure-index-netflow.png

Then browse “Discover” tab, you should be able to explore your Netflow events.

../_images/kibana-logisland-metrics-netflow.png

You have now to save your search by clicking the save icon. Save this search as “netflowsearch”

../_images/kibana-save-search.png

Display network information in kibana dashboard

First, you need to import the predefined Kibana dashboard (download this file first) under Settings tab, Objetcs subtab.

Select Import and load previously saved netflow_dashboard.json dashboard (it also contains required Kibana visualizations)

../_images/kibana-logisland-import-dashboard.png

Then visit Dashboard tab, and open dashboard_netflow dashboard by clicking on Load Saved Dashboard. You should be able to visualize information about the generated traffic (choose the right time window, corresponding to the time of your traffic, in the right upper corner of kibana page)

../_images/kibana-logisland-dashboard.png