API design¶
logisland is a framework that you can extend through its API,
you can use it to build your own Processors
or to build data processing apps over it.
Java API¶
You can extend logisland with the Java low-level API as described below.
The primary material : Records¶
The basic unit of processing is the Record.
A Record
is a collection of Field
, while a Field
has a name
, a type
and a value
.
You can instanciate a Record
like in the following code snipet:
String id = "firewall_record1";
String type = "cisco";
Record record = new Record(type).setId(id);
assertTrue(record.isEmpty());
assertEquals(record.size(), 0);
A record is defined by its type and a collection of fields. there are three special fields:
// shortcut for id
assertEquals(record.getId(), id);
assertEquals(record.getField(FieldDictionary.RECORD_ID).asString(), id);
// shortcut for time
assertEquals(record.getTime().getTime(), record.getField(FieldDictionary.RECORD_TIME).asLong().longValue());
// shortcut for type
assertEquals(record.getType(), type);
assertEquals(record.getType(), record.getField(FieldDictionary.RECORD_TYPE).asString());
assertEquals(record.getType(), record.getField(FieldDictionary.RECORD_TYPE).getRawValue());
And the other fields have generic setters, getters and removers
record.setStringField("url_host", "origin-www.20minutes.fr")
.setField("method", FieldType.STRING, "GET")
.setField("response_size", FieldType.INT, 452)
.setField("is_outside_office_hours", FieldType.BOOLEAN, false)
.setField("tags", FieldType.ARRAY, Arrays.asList("spam", "filter", "mail"));
assertFalse(record.hasField("unkown_field"));
assertTrue(record.hasField("method"));
assertEquals(record.getField("method").asString(), "GET");
assertTrue(record.getField("response_size").asInteger() - 452 == 0);
assertTrue(record.getField("is_outside_office_hours").asBoolean());
record.removeField("is_outside_office_hours");
assertFalse(record.hasField("is_outside_office_hours"));
Fields are strongly typed, you can validate them
Record record = new StandardRecord();
record.setField("request_size", FieldType.INT, 1399);
assertTrue(record.isValid());
record.setField("request_size", FieldType.INT, "zer");
assertFalse(record.isValid());
record.setField("request_size", FieldType.INT, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.LONG, 45L);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45.5d);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45.5);
assertTrue(record.isValid());
record.setField("request_size", FieldType.DOUBLE, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.FLOAT, 45.5f);
assertTrue(record.isValid());
record.setField("request_size", FieldType.STRING, 45L);
assertFalse(record.isValid());
record.setField("request_size", FieldType.FLOAT, 45.5d);
assertFalse(record.isValid());
The tools to handle processing : Processor¶
logisland is designed as a component centric framework, so there’s a layer of abstraction to build configurable components. Basically a component can be Configurable and Configured.
The most common component you’ll use is the Processor
Let’s explain the code of a basic MockProcessor
, that doesn’t acheive a really useful work but which is really self-explanatory
we first need to extend AbstractProcessor
class (or to implement Processor
interface).
public class MockProcessor extends AbstractProcessor {
private static Logger logger = LoggerFactory.getLogger(MockProcessor.class);
private static String EVENT_TYPE_NAME = "mock";
Then we have to define a list of supported PropertyDescriptor
. All theses properties and validation stuff are handled by
Configurable
interface.
public static final PropertyDescriptor FAKE_MESSAGE
= new PropertyDescriptor.Builder()
.name("fake.message")
.description("a fake message")
.required(true)
.addValidator(StandardPropertyValidators.NON_EMPTY_VALIDATOR)
.defaultValue("yoyo")
.build();
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(FAKE_MESSAGE);
return Collections.unmodifiableList(descriptors);
}
then comes the initialization bloc of the component given a ComponentContext
(more on this later)
@Override
public void init(final ComponentContext context) {
logger.info("init MockProcessor");
}
And now the real business part with the process
method which handles all the work on the record’s collection.
@Override
public Collection<Record> process(final ComponentContext context,
final Collection<Record> collection) {
// log inputs
collection.stream().forEach(record -> {
logger.info("mock processing record : {}", record)
});
// output a useless record
Record mockRecord = new Record("mock_record");
mockRecord.setField("incomingEventsCount", FieldType.INT, collection.size());
mockRecord.setStringField("message",
context.getProperty(FAKE_MESSAGE).asString());
return Collections.singleton(mockRecord);
}
}
The runtime context : Instance¶
you can use your wonderful processor by setting its configuration and asking the ComponentFactory
to give you one ProcessorInstance
which is a ConfiguredComponent
.
String message = "logisland rocks !";
Map<String, String> conf = new HashMap<>();
conf.put(MockProcessor.FAKE_MESSAGE.getName(), message );
ProcessorConfiguration componentConfiguration = new ProcessorConfiguration();
componentConfiguration.setComponent(MockProcessor.class.getName());
componentConfiguration.setType(ComponentType.PROCESSOR.toString());
componentConfiguration.setConfiguration(conf);
Optional<StandardProcessorInstance> instance =
ComponentFactory.getProcessorInstance(componentConfiguration);
assertTrue(instance.isPresent());
Then you need a ComponentContext
to run your processor.
ComponentContext context = new StandardComponentContext(instance.get());
Processor processor = instance.get().getProcessor();
And finally you can use it to process records
Record record = new Record("mock_record");
record.setId("record1");
record.setStringField("name", "tom");
List<Record> records =
new ArrayList<>(processor.process(context, Collections.singleton(record)));
assertEquals(1, records.size());
assertTrue(records.get(0).hasField("message"));
assertEquals(message, records.get(0).getField("message").asString());
Chaining processors in a stream : RecordStream¶
Warning
@todo
Running the processor’s flow : Engine¶
Warning
@todo
Packaging and conf¶
The end user of logisland is not the developer, but the business analyst which does understand any line of code. That’s why we can deploy all our components through yaml config files
- processor: mock_processor
component: com.hurence.logisland.util.runner.MockProcessor
type: parser
documentation: a parser that produce events for nothing
configuration:
fake.message: the super message
Testing your processors : TestRunner¶
When you have coded your processor, pretty sure you want to test it with unit test.
The framework provides you with the TestRunner
tool for that.
All you need is to instantiate a Testrunner with your Processor and its properties.
final String APACHE_LOG_SCHEMA = "/schemas/apache_log.avsc";
final String APACHE_LOG = "/data/localhost_access.log";
final String APACHE_LOG_FIELDS =
"src_ip,identd,user,record_time,http_method,http_query,http_version,http_status,bytes_out";
final String APACHE_LOG_REGEX =
"(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\\s+\"(\\S+)\\s+(\\S+)\\s+(\\S+)\"\\s+(\\S+)\\s+(\\S+)";
final TestRunner testRunner = TestRunners.newTestRunner(new SplitText());
testRunner.setProperty(SplitText.VALUE_REGEX, APACHE_LOG_REGEX);
testRunner.setProperty(SplitText.VALUE_FIELDS, APACHE_LOG_FIELDS);
// check if config is valid
testRunner.assertValid();
Now enqueue some messages as if they were sent to input Kafka topics
testRunner.clearQueues();
testRunner.enqueue(SplitTextTest.class.getResourceAsStream(APACHE_LOG));
Now run the process method and check that every Record
has been correctly processed.
testRunner.run();
testRunner.assertAllInputRecordsProcessed();
testRunner.assertOutputRecordsCount(200);
testRunner.assertOutputErrorCount(0);
You can validate that all output records are validated against an avro schema
final RecordValidator avroValidator = new AvroRecordValidator(SplitTextTest.class.getResourceAsStream
testRunner.assertAllRecords(avroValidator);
And check if your output records behave as expected.
MockRecord out = testRunner.getOutputRecords().get(0);
out.assertFieldExists("src_ip");
out.assertFieldNotExists("src_ip2");
out.assertFieldEquals("src_ip", "10.3.10.134");
out.assertRecordSizeEquals(9);
out.assertFieldEquals(FieldDictionary.RECORD_TYPE, "apache_log");
out.assertFieldEquals(FieldDictionary.RECORD_TIME, 1469342728000L);
REST API¶
You can extend logisland with the Java high-level REST API as described below.
Design Tools¶
The REST API is designed with Swagger
You can use the docker image for the swagger-editor to edit the swagger yaml file and generate source code.
docker pull swaggerapi/swagger-editor
docker run -d -p 80:8080 swaggerapi/swagger-editor
If you’re under mac you can setup swagger-codegen
brew install swagger-codegen
# or
wget https://oss.sonatype.org/content/repositories/releases/io/swagger/swagger-codegen-cli/2.2.1/swagger-codegen-cli-2.2.1.jar
You can then start to generate the source code from the swgger yaml file
swagger-codegen generate \
--group-id com.hurence.logisland \
--artifact-id logisland-agent \
--artifact-version 0.10.0-rc1 \
--api-package com.hurence.logisland.agent.rest.api \
--model-package com.hurence.logisland.agent.rest.model \
-o logisland-framework/logisland-agent \
-l jaxrs \
--template-dir logisland-framework/logisland-agent/src/main/swagger/templates \
-i logisland-framework/logisland-agent/src/main/swagger/api-swagger.yaml
Swagger Jetty server¶
This server was generated by the swagger-codegen project. By using the OpenAPI-Spec from a remote server, you can easily generate a server stub. This is an example of building a swagger-enabled JAX-RS server.
This example uses the JAX-RS framework.
To run the server, please execute the following:
cd logisland-framework/logisland-agent
mvn clean package jetty:run
You can then view the swagger.json .
> Note that if you have configured the host to be something other than localhost, the calls through swagger-ui will be directed to that host and not localhost!