Skip navigation.

Rittman Mead Consulting

Syndicate content Rittman Mead Consulting
Delivering Oracle Business Intelligence
Updated: 16 hours 4 min ago

Forays into Kafka – Enabling Flexible Data Pipelines

Tue, 2015-10-27 17:46

One of the defining features of “Big Data” from a technologist’s point of view is the sheer number of tools and permutations at one’s disposal. Do you go Flume or Logstash? Avro or Thrift? Pig or Spark? Foo or Bar? (I made that last one up). This wealth of choice is wonderful because it means we can choose the right tool for the right job each time.

Of course, we need to establish that have indeed chosen the right tool for the right job. But here’s the paradox. How do we easily work out if a tool is going to do what we want of it and is going to be a good fit, without disturbing what we already have in place? Particularly if it’s something that’s going to be part of an existing Productionised data pipeline, inserting a new tool partway through what’s there already is going to risk disrupting that. We potentially end up with a series of cloned environments, all diverging from each other, and not necessarily comparable (not to mention the overhead of the resource to host it all).

The same issue arises when we want to change the code or configuration of an existing pipeline. Bugs creep in, ideas to enhance the processing that you’ve currently got present themselves. Wouldn’t it be great if we could test these changes reliably and with no risk to the existing system?

This is where Kafka comes in. Kafka is very useful for two reasons:

  1. You can use it as a buffer for data that can be consumed and re-consumed on demand
  2. Multiple consumers can all pull the data, independently and at their own rate.

So you take your existing pipeline, plumb in Kafka, and then as and when you want to try out additional tools (or configurations of existing ones) you simply take another ‘tap’ off the existing store. This is an idea that Gwen Shapira put forward in May 2015 and really resonated with me.

I see Kafka sitting right on that Execution/Innovation demarcation line of the Information Management and Big Data Reference Architecture that Oracle and Rittman Mead produced last year:

Kafka enables us to build a pipeline for our analytics that breaks down into two phases:

  1. Data ingest from source into Kafka, simple and reliable. Fewest moving parts as possible.
  2. Post-processing. Batch or realtime. Uses Kafka as source. Re-runnable. Multiple parallel consumers: –
    • Productionised processing into Event Engine, Data Reservoir and beyond
    • Adhoc/loosely controlled Data Discovery processing and re-processing

These two steps align with the idea of “Obtain” and “Scrub” that Rittman Mead’s Jordan Meyer talked about in his BI Forum 2015 Masterclass about the Data Discovery:

So that’s the theory – let’s now look at an example of how Kafka can enable us to build a more flexible and productive data pipeline and environment.

Flume or Logstash? HDFS or Elasticsearch? … All of them!

Mark Rittman wrote back in April 2014 about using Apache Flume to stream logs from the Rittman Mead web server over to HDFS, from where they could be analysed in Hive and Impala. The basic setup looked like this:

Another route for analysing data is through the ELK stack. It does a similar thing – streams logs (with Logstash) in to a data store (Elasticsearch) from where they can be analysed, just with a different set of tools with a different emphasis on purpose. The input is the same – the web server log files. Let’s say I want to evaluate which is the better mechanism for analysing my log files, and compare the two side-by-side. Ultimately I might only want to go forward with one, but for now, I want to try both.

I could run them literally in parallel:

The disadvantage with this is that I have twice the ‘footprint’ on my data source, a Production server. A principle throughout all of this is that we want to remain light-touch on the sources of data. Whether a Production web server, a Production database, or whatever – upsetting the system owners of the data we want is never going to win friends.

An alternative to running in parallel would be to use one of the streaming tools to load data in place of the other, i.e.


The issue with this is I want to validate the end-to-end pipeline. Using a single source is better in terms of load/risk to the source system, but less so for validating my design. If I’m going to go with Elasticsearch as my target, Logstash would be the better fit source. Ditto HDFS/Flume. Both support connectors to the other, but using native capabilities always feels to me a safer option (particularly in the open-source world). And what if the particular modification I’m testing doesn’t support this kind of connectivity pattern?

Can you see where this is going? How about this:

The key points here are:

  1. One hit on the source system. In this case it’s flume, but it could be logstash, or another tool. This streams each line of the log file into Kafka in the exact order that it’s read.
  2. Kafka holds a copy of the log data, for a configurable time period. This could be days, or months – up to you and depending on purpose (and disk space!)
  3. Kafka is designed to be distributed and fault-tolerant. As with most of the boxes on this logical diagram it would be physically spread over multiple machines for capacity, performance, and resilience.
  4. The eventual targets, HDFS and Elasticsearch, are loaded by their respective tools pulling the web server entries exactly as they were on disk. In terms of validating end-to-end design we’re still doing that – we’re just pulling from a different source.

Another massively important benefit of Kafka is this:

Sooner or later (and if you’re new to the tool and code/configuration required, probably sooner) you’re going to get errors in your data pipeline. These may be fatal and cause it to fall in a heap, or they may be more subtle and you only realise after analysis that some of your data’s missing or not fully enriched. What to do? Obviously you need to re-run your ingest process. But how easy is that? Where is the source data? Maybe you’ll have a folder full of “.processed” source log files, or an HDFS folder of raw source data that you can reprocess. The issue here is the re-processing – you need to point your code at the alternative source, and work out the range of data to reprocess.

This is all eminently do-able of course – but wouldn’t it be easier just to rerun your existing ingest pipeline and just rewind the point at which it’s going to pull data from? Minimising the amount of ‘replumbing’ and reconfiguration to run a re-process job vs. new ingest makes it faster to do, and more reliable. Each additional configuration change is an opportunity to mis-configure. Each ‘shadow’ script clone for re-running vs normal processing is increasing the risk of code diverging and stale copies being run.

The final pipeline in this simple example looks like this:

  • The source server logs are streamed into Kafka, with a permanent copy up onto Amazon’s S3 for those real “uh oh” moments. Kafka, in a sandbox environment with a ham-fisted sysadmin, won’t be bullet-proof. Better to recover a copy from S3 than have to bother the Production server again. This is something I’ve put in for this specific use case, and wouldn’t be applicable in others.
  • From Kafka the web server logs are available to stream, as if natively from the web server disk itself, through Flume and Logstash.

There’s a variation on a theme of this, that looks like this:

Instead of Flume -> Kafka, and then a second Flume -> HDFS, we shortcut this and have the same Flume agent as is pulling from source writing to HDFS. Why have I not put this as the final pipeline? Because of this:

Let’s say that I want to do some kind of light-touch enrichment on the files, such as extracting the log timestamp in order to partition my web server logs in HDFS by the date of the log entry (not the time of processing, because I’m working with historical files too). I’m using a regex_extractor interceptor in Flume to determine the timestamp from the event data (log entry) being processed. That’s great, and it works well – when it works. If I get my regex wrong, or the log file changes date format, the house of cards comes tumbling down. Now I have a mess, because my nice clean ingest pipeline from the source system now needs fixing and re-running. As before, of course it is possible to write this cleanly so that it doesn’t break, etc etc, but from the point of view of decoupling operations for manageability and flexibility it makes sense to keep them separate (remember the Obtain vs Scrub point above?).

The final note on this is to point out that technically we can implement the pipeline using a Kafka Flume channel, which is a slightly neater way of doing things. The data still ends up in the S3 sink, and available in Kafka for streaming to all the consumers.

Kafka in Action

Let’s take a look at the configuration to put the above theory into practice. I’m running all of this on Oracle’s BigDataLite 4.2.1 VM which includes, amongst many other goodies, CDH 5.4.0. Alongside this I’ve installed into /opt :

  • apache-flume-1.6.0
  • elasticsearch-1.7.3
  • kafka_2.10-
  • kibana-4.1.2-linux-x64
  • logstash-1.5.4
The Starting Point – Flume -> HDFS

First, we’ve got the initial Logs -> Flume -> HDFS configuration, similar to what Mark wrote about originally:

source_agent.sources = apache_server  
source_agent.sources.apache_server.type = exec  
source_agent.sources.apache_server.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_server.batchSize = 1  
source_agent.sources.apache_server.channels = memoryChannel

source_agent.channels = memoryChannel  
source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
source_agent.sinks = hdfs_sink  
source_agent.sinks.hdfs_sink.type = hdfs = memoryChannel  
source_agent.sinks.hdfs_sink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
source_agent.sinks.hdfs_sink.hdfs.fileType = DataStream  
source_agent.sinks.hdfs_sink.hdfs.writeFormat = Text  
source_agent.sinks.hdfs_sink.hdfs.rollSize = 0  
source_agent.sinks.hdfs_sink.hdfs.rollCount = 10000  
source_agent.sinks.hdfs_sink.hdfs.rollInterval = 600

After running this

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent \
--conf-file flume_website_logs_02_tail_source_hdfs_sink.conf

we get the logs appearing in HDFS and can see them easily in Hue:

Adding Kafka to the Pipeline

Let’s now add Kafka to the mix. I’ve already set up and started Kafka (see here for how), and Zookeeper’s already running as part of the default BigDataLite build.

First we need to define a Kafka topic that is going to hold the log files. In this case it’s called apache_logs:

$ /opt/kafka_2.10- --zookeeper bigdatalite:2181 \
--create --topic apache_logs  --replication-factor 1 --partitions 1

Just to prove it’s there and we can send/receive message on it I’m going to use the Kafka console producer/consumer to test it. Run these in two separate windows:

$ /opt/kafka_2.10- \
--broker-list bigdatalite:9092 --topic apache_logs

$ /opt/kafka_2.10- \
--zookeeper bigdatalite:2181 --topic apache_logs

With the Consumer running enter some text, any text, in the Producer session and you should see it appear almost immediately in the Consumer window.

Now that we’ve validated the Kafka topic, let’s plumb it in. We’ll switch the existing Flume config to use a Kafka sink, and then add a second Flume agent to do the Kafka -> HDFS bit, giving us this:

The original flume agent configuration now looks like this:

source_agent.sources = apache_log_tail  
source_agent.channels = memoryChannel  
source_agent.sinks = kafka_sink

source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = memoryChannel

source_agent.channels.memoryChannel.type = memory  
source_agent.channels.memoryChannel.capacity = 100

## Write to Kafka = memoryChannel  
source_agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink  
source_agent.sinks.kafka_sink.batchSize = 5  
source_agent.sinks.kafka_sink.brokerList = bigdatalite:9092  
source_agent.sinks.kafka_sink.topic = apache_logs

Restart the from above so that you can see what’s going into Kafka, and then run the Flume agent. You should see the log entries appearing soon after. Remember that is just one consumer of the logs – when we plug in the Flume consumer to write the logs to HDFS we can opt to pick up all of the entries in Kafka, completely independently of what we have or haven’t consumed in

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent \ 
--conf-file flume_website_logs_03_tail_source_kafka_sink.conf

[oracle@bigdatalite ~]$ /opt/kafka_2.10- \
--zookeeper bigdatalite:2181 --topic apache_logs - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; - free monitoring service;" - - [06/Sep/2015:08:08:35 +0000] "HEAD /blog HTTP/1.1" 301 - "" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)" - - [06/Sep/2015:08:08:35 +0000] "GET /blog/ HTTP/1.0" 200 145999 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service;" - - [06/Sep/2015:08:08:36 +0000] "HEAD /blog/ HTTP/1.1" 200 - "" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)" - - [06/Sep/2015:08:08:44 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; - free monitoring service;" - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; monitis - premium monitoring service;" - - [06/Sep/2015:08:08:58 +0000] "GET / HTTP/1.1" 200 36946 "-" "Echoping/6.0.2"

Set up the second Flume agent to use Kafka as a source, and HDFS as the target just as it was before we added Kafka into the pipeline:

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink 

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel

target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 100

## Write to HDFS  
target_agent.sinks.hdfsSink.type = hdfs = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 10000  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

Fire up the agent:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent \
-f flume_website_logs_04_kafka_source_hdfs_sink.conf

and as the website log data streams in to Kafka (from the first Flume agent) you should see the second Flume agent sending it to HDFS and evidence of this in the console output from Flume:

15/10/27 13:53:53 INFO hdfs.BucketWriter: Creating /user/oracle/incoming/rm_logs/apache_log/FlumeData.1445954032932.tmp

and in HDFS itself:

Play it again, Sam?

All we’ve done to this point is add Kafka into the pipeline, ready for subsequent use. We’ve not changed the nett output of the data pipeline. But, we can now benefit from having Kafka there, by re-running some of our HDFS load without having to go back to the source files. Let’s say we want to partition the logs as we store them. But, we don’t want to disrupt the existing processing. How? Easy! Just create another Flume agent with the additional configuration in to do the partitioning.

target_agent.sources = kafkaSource  
target_agent.channels = memoryChannel  
target_agent.sinks = hdfsSink

target_agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
target_agent.sources.kafkaSource.zookeeperConnect = bigdatalite:2181  
target_agent.sources.kafkaSource.topic = apache_logs  
target_agent.sources.kafkaSource.batchSize = 5  
target_agent.sources.kafkaSource.batchDurationMillis = 200  
target_agent.sources.kafkaSource.channels = memoryChannel  
target_agent.sources.kafkaSource.groupId = new = smallest  
target_agent.sources.kafkaSource.interceptors = i1

target_agent.channels.memoryChannel.type = memory  
target_agent.channels.memoryChannel.capacity = 1000

# Regex Interceptor to set timestamp so that HDFS can be written to partitioned  
target_agent.sources.kafkaSource.interceptors.i1.type = regex_extractor  
target_agent.sources.kafkaSource.interceptors.i1.serializers = s1  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer = timestamp  
# Match this format logfile to get timestamp from it:  
# - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_("  
target_agent.sources.kafkaSource.interceptors.i1.regex = (\\d{2}\\/[a-zA-Z]{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}\\s\\+\\d{4})  
target_agent.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z  

## Write to HDFS  
target_agent.sinks.hdfsSink.type = hdfs = memoryChannel  
target_agent.sinks.hdfsSink.hdfs.path = /user/oracle/incoming/rm_logs/apache/%Y/%m/%d/access_log  
target_agent.sinks.hdfsSink.hdfs.fileType = DataStream  
target_agent.sinks.hdfsSink.hdfs.writeFormat = Text  
target_agent.sinks.hdfsSink.hdfs.rollSize = 0  
target_agent.sinks.hdfsSink.hdfs.rollCount = 0  
target_agent.sinks.hdfsSink.hdfs.rollInterval = 600

The important lines of note here (as highlighted above) are:

  • the regex_extractor interceptor which determines the timestamp of the log event, then used in the hdfs.path partitioning structure
  • the groupId and configuration items for the kafkaSource.
    • The groupId ensures that this flume agent’s offset in the consumption of the data in the Kafka topic is maintained separately from that of the original agent that we had. By default it is flume, and here I’m overriding it to new. It’s a good idea to specify this explicitly in all Kafka flume consumer configurations to avoid complications.
    • tells the consumer that if no existing offset is found (which is won’t be, if the groupId is new one) to start from the beginning of the data rather than the end (which is what it will do by default).
    • Thus if you want to get Flume to replay the contents of a Kafka topic, just set the groupId to an unused one (eg ‘foo01’, ‘foo02’, etc) and make sure the is smallest

Now run it (concurrently with the existing flume agents if you want):

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent -n target_agent \
-f flume_website_logs_07_kafka_source_partitioned_hdfs_sink.conf

You should see a flurry of activity (or not, depending on how much data you’ve already got in Kafka), and some nicely partitioned apache logs in HDFS:

Crucially, the existing flume agent and non-partitioned HDFS pipeline stays in place and functioning exactly as it was – we’ve not had to touch it. We could then run two two side-by-side until we’re happy the partitioning is working correctly and then decommission the first. Even at this point we have the benefit of Kafka, because we just turn off the original HDFS-writing agent – the new “live” one continues to run, it doesn’t need reconfiguring. We’ve validated the actual configuration we’re going to use for real, we’ve not had to simulate it up with mock data sources that then need re-plumbing prior to real use.

Clouds and Channels

We’re going to evolve the pipeline a bit now. We’ll go back to a single Flume agent writing to HDFS, but add in Amazon’s S3 as the target for the unprocessed log files. The point here is not so much that S3 is the best place to store log files (although it is a good option), but as a way to demonstrate a secondary method of keeping your raw data available without impacting the source system. It also fits nicely with using the Kafka flume channel to tighten the pipeline up a tad:

Amazon’s S3 service is built on HDFS itself, and Flume can use the S3N protocol to write directly to it. You need to have already set up your S3 ‘bucket’, and have the appropriate AWS Access Key ID and Secret Key. To get this to work I added these credentials to /etc/hadoop/conf.bigdatalite/core-site.xml (I tried specifying them inline with the flume configuration but with no success):


Once you’ve set up the bucket and credentials, the original flume agent (the one pulling the actual web server logs) can be amended:

source_agent.sources = apache_log_tail  
source_agent.channels = kafkaChannel  
source_agent.sinks = s3Sink

source_agent.sources.apache_log_tail.type = exec  
source_agent.sources.apache_log_tail.command = tail -f /home/oracle/website_logs/access_log  
source_agent.sources.apache_log_tail.batchSize = 1  
source_agent.sources.apache_log_tail.channels = kafkaChannel

## Write to Kafka Channel = kafkaChannel  
source_agent.channels.kafkaChannel.type =  
source_agent.channels.kafkaChannel.topic = apache_logs  
source_agent.channels.kafkaChannel.brokerList = bigdatalite:9092  
source_agent.channels.kafkaChannel.zookeeperConnect = bigdatalite:2181

## Write to S3 = kafkaChannel  
source_agent.sinks.s3Sink.type = hdfs  
source_agent.sinks.s3Sink.hdfs.path = s3n://rmoff-test/apache  
source_agent.sinks.s3Sink.hdfs.fileType = DataStream  
source_agent.sinks.s3Sink.hdfs.filePrefix = access_log  
source_agent.sinks.s3Sink.hdfs.writeFormat = Text  
source_agent.sinks.s3Sink.hdfs.rollCount = 10000  
source_agent.sinks.s3Sink.hdfs.rollSize = 0  
source_agent.sinks.s3Sink.hdfs.batchSize = 10000  
source_agent.sinks.s3Sink.hdfs.rollInterval = 600

Here the source is the same as before (server logs), but the channel is now Kafka itself, and the sink S3. Using Kafka as the channel has the nice benefit that the data is now already in Kafka, we don’t need that as an explicit target in its own right.

Restart the source agent using this new configuration:

$ /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --name source_agent \
--conf-file flume_website_logs_09_tail_source_kafka_channel_s3_sink.conf

and you should get the data appearing on both HDFS as before, and now also in the S3 bucket:

Didn’t Someone Say Logstash?

The premise at the beginning of this exercise was that I could extend an existing data pipeline to pull data into a new set of tools, as if from the original source, but without touching that source or the existing configuration in place. So far we’ve got a pipeline that is pretty much as we started with, just with Kafka in there now and an additional feed to S3:

Now we’re going to extend (or maybe “broaden” is a better term) the data pipeline to add Elasticsearch into it:

Whilst Flume can write to Elasticsearch given the appropriate extender, I’d rather use a tool much closer to Elasticsearch in origin and direction – Logstash. Logstash supports Kafka as an input (and an output, if you want), making the configuration ridiculously simple. To smoke-test the configuration just run Logstash with this configuration:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'


output {  
        stdout {codec => rubydebug }  

A few of things to point out in the input configuration:

  • You need to specify plain codec (assuming your input from Kafka is). The default codec for the Kafka plugin is json, and Logstash does NOT like trying to parse plain text and json as I found out: - - [06/Sep/2015:08:08:30 +0000] "GET / HTTP/1.0" 301 235 "-" "Mozilla/5.0 (compatible; - free monitoring service;" {:exception=>#<NoMethodError: undefined method `[]' for 37.252:Float>, :backtrace=>["/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/event.rb:73:in `initialize'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-codec-json-1.0.1/lib/logstash/codecs/json.rb:46:in `decode'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error}

  • As well as specifying the codec, I needed to specify the charset. Without this I got \\u0000\\xBA\\u0001 at the beginning of each message that Logstash pulled from Kafka

  • Specifying reset_beginning and auto_offset_reset tell Logstash to pull everything in from Kafka, rather than starting at the latest offset.

When you run the configuration file above you should see a stream of messages to your console of everything that is in the Kafka topic:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_10_kafka_source_console_output.conf

The output will look like this – note that Logstash has added its own special @version and @timestamp fields:

       "message" => " - - [09/Oct/2015:04:13:23 +0000] \"GET /wp-content/uploads/2014/10/JFB-View-Selector-LowRes-300x218.png HTTP/1.1\" 200 53295 \"\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36\"",  
      "@version" => "1",  
    "@timestamp" => "2015-10-27T17:29:06.596Z"  

Having proven the Kafka-Logstash integration, let’s do something useful – get all those lovely log entries streaming from source, through Kafka, enriched in Logstash with things like geoip, and finally stored in Elasticsearch:

input {  
        kafka {  
                zk_connect => 'bigdatalite:2181'  
                topic_id => 'apache_logs'  
                codec => plain {  
                        charset => "ISO-8859-1"  
                # Use both the following two if you want to reset processing  
                reset_beginning => 'true'  
                auto_offset_reset => 'smallest'  

filter {  
        # Parse the message using the pre-defined "COMBINEDAPACHELOG" grok pattern  
        grok { match => ["message","%{COMBINEDAPACHELOG}"] }

        # Ignore anything that's not a blog post hit, characterised by /yyyy/mm/post-slug form  
        if [request] !~ /^\/[0-9]{4}\/[0-9]{2}\/.*$/ { drop{} }

        # From the blog post URL, strip out the year/month and slug  
        #     year  => 2015  
        #     month =>   02  
        #     slug  => obiee-monitoring-and-diagnostics-with-influxdb-and-grafana  
        grok { match => [ "request","\/%{NUMBER:post-year}\/%{NUMBER:post-month}\/(%{NUMBER:post-day}\/)?%{DATA:post-slug}(\/.*)?$"] }

        # Combine year and month into one field  
        mutate { replace => [ "post-year-month" , "%{post-year}-%{post-month}" ] }

        # Use GeoIP lookup to locate the visitor's town/country  
        geoip { source => "clientip" }

        # Store the date of the log entry (rather than now) as the event's timestamp  
        date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]}  

output {  
        elasticsearch { host => "bigdatalite"  index => "blog-apache-%{+YYYY.MM.dd}"}  

Make sure that Elasticsearch is running and then kick off Logstash:

$ /opt/logstash-1.5.4/bin/logstash -f logstash-apache_01_kafka_source_parsed_to_es.conf

Nothing will appear to happen on the console:

log4j, [2015-10-27T17:36:53.228]  WARN: org.elasticsearch.bootstrap: JNA not found. native methods will be disabled.  
Logstash startup completed

But in the background Elasticsearch will be filling up with lots of enriched log data. You can confirm this through the useful kopf plugin to see that the Elasticsearch indices are being created:

and directly through Elasticsearch’s RESTful API too:

$ curl -s -XGET http://bigdatalite:9200/_cat/indices?v|sort  
health status index                  pri rep docs.count docs.deleted store.size  
yellow open   blog-apache-2015.09.30   5   1      11872            0       11mb           11mb  
yellow open   blog-apache-2015.10.01   5   1      13679            0     12.8mb         12.8mb  
yellow open   blog-apache-2015.10.02   5   1      10042            0      9.6mb          9.6mb  
yellow open   blog-apache-2015.10.03   5   1       8722            0      7.3mb          7.3mb

And of course, the whole point of streaming the data into Elasticsearch in the first place – easy analytics through Kibana:


Kafka is awesome :-D

We’ve seen in this article how Kafka enables the implementation of flexible data pipelines that can evolve organically without requiring system rebuilds to implement or test new methods. It allows the data discovery function to tap in to the same source of data as the more standard analytical reporting one, without risking impacting the source system at all.

The post Forays into Kafka – Enabling Flexible Data Pipelines appeared first on Rittman Mead Consulting.

Categories: BI & Warehousing

Oracle Business Intelligence 12c Now Available – Improving Agility and Enabling Self-Service for BI Users

Mon, 2015-10-26 00:08

Oracle Business Intelligence 12c became available for download last Friday and is being officially launched at Oracle Openworld next week. Key new features in 12c include an updated and cleaner look-and-feel, Visual Analyser that brings Tableau-style reporting to OBIEE users along with another new feature called “data-mashups”,   enables users to upload spreadsheets of their own data to combine with their main curated datasets.


Behind the scenes the back-end of OBIEE has been overhauled with simplification aimed at making it easier to clone, provision and backup BI systems, whilst other changes are laying the foundation for future public and private cloud features that we’ll see over the coming years – and expect Oracle BI Cloud Service to be an increasingly important part of Oracle’s overall BI offering over the next few years as innovation comes more rapidly and “cloud-first”.

So what does Oracle Business Intelligence 12c offer customers currently on the 11g release, and why would you want to upgrade? In our view, the new features in 12c come down to two main areas – “agility”, and “self-service” – two major trends that having been driving spend and investment in BI over the past few years.

OBIEE 12c for Business Agility – Giving Users the Ability to complete the “Last Mile” in Reporting, and Moving towards “BI-as-a-Service” for IT

A common issue that all BI implementors have had over many years is the time it takes to spin-up new environments, create reports for users, to respond to new requirements and new opportunities. OBIEE12c new features such as data mashups make it easier for end-users to complete the “last mile” in reporting by adding particular measures and attribute values to the reports and subject areas provided by IT, avoiding the situation where they instead export all data to Excel or wait for IT to add the data they need into the curated dataset managed centrally.


From an IT perspective, simplifications to the back-end of OBIEE such as bringing all configuration files into one place, deprecating the BI Systems Management API and returning to configuration files, simpler upgrades and faster installation make it quicker and easier to provision new 12c environments and move workloads between on-premise and in the cloud. The point of these changes is to enable organisations to respond to opportunities faster, and make sure IT isn’t the thing that’s slowing the reporting process down.

OBIEE 12c for Self-Service – Recognising the Shift in Ownership from IT to the End-Users

One of the biggest trends in BI, and in computing in-general over the past few years, is the consumerization of IT and expectations around self-service. A big beneficiary of that trend has been vendors such as Tableau and Qlikview who’ve delivered BI tools that run on the desktop, make everything point-and-click and are the equivalent to the PC vendors when IT used to run mainframes; data and applications became a bit of a free-for-all but users were able to get things done now, rather than having to wait for IT to provide the service. Similar to the data upload feature I mentioned in the context of agility, the new Visual Analyser feature in OBIEE12c brings those same self-service, point-and-click data analysis features to OBIEE users – but crucially with a centrally managed, single-version-of-the-truth business semantic model at the centre of things.


Visual Analyser comes with the same data-mashup features as Answers, and new advanced analytics capabilities in Logical SQL and Answers’s query builder bring statistical functions like trend analysis and clustering into the hands of end-users, avoiding the need to involve DBAs or data scientists to provide complex SQL functions. If you do have a data scientist and you want to re-use their work without learning another tool, OBIEE12c makes it possible to call external R functions within Answers separate to the Oracle R Enterprise integration in OBIEE11g.

We’ll be covering more around the OBIEE12c launch over the coming weeks building on these themes of enabling business agility, and putting more self-service tools into the hands of users. We’ll also be launching our new OBIEE12c course over the next couple of days, with the first runs happening in Brighton and Atlanta in January 2016 – watch this space for more details.

The post Oracle Business Intelligence 12c Now Available – Improving Agility and Enabling Self-Service for BI Users appeared first on Rittman Mead Consulting.

Categories: BI & Warehousing

Rittman Mead at Oracle Openworld 2015, San Francisco

Thu, 2015-10-22 13:21

Oracle Openworld 2015 is running next week in San Francisco, USA, and Rittman Mead are proud to be delivering a number of sessions over the week of the conference. We’ll also be taking part in a number of panel sessions, user group events and networking sessions, and running 1:1 sessions with anyone interested in talking to us about the solutions and services we’re talking about during the week.


Sessions at Oracle Openworld 2015 from Rittman Mead are as follows:

  • A Walk Through the Kimball ETL Subsystems with Oracle Data Integration Solutions [UGF6311] – Michael Rainey, Sunday, Oct 25, 12:00 p.m. | Moscone South—301
  • Oracle Business Intelligence Cloud Service—Moving Your Complete BI Platform to the Cloud [UGF4906] – Mark Rittman, Sunday, Oct 25, 2:30 p.m. | Moscone South—301
  • Developer Best Practices for Oracle Data Integrator Lifecycle Management [CON9611] – Jerome Francoisse + others, Thursday, Oct 29, 2:30 p.m. | Moscone West—2022
  • Oracle Data Integration Product Family: a Cornerstone for Big Data [CON9609] – Mark Ritman + others, Wednesday, Oct 28, 12:15 p.m. | Moscone West—2022
  • Empowering Users: Oracle Business Intelligence Enterprise Edition 12c Visual Analyzer [UGF5481] – Edel Kammermann, Sunday, Oct 25, 10:00 a.m. | Moscone West—3011
  • No Big Data Hacking—Time for a Complete ETL Solution with Oracle Data Integrator 12c [UGF5827] – – Jerome Francoisse, Sunday, Oct 25, 8:00 a.m. | Moscone South—301

We’ll be at Openworld all week and available at various times to talk through topics we covered in our sessions, or any aspect of Oracle BI, DW and Big Data implementations you might be planning or currently running. Drop us an email at to set something up during the week, or come along to any of our sessions and meet us in person

The post Rittman Mead at Oracle Openworld 2015, San Francisco appeared first on Rittman Mead Consulting.

Categories: BI & Warehousing

Introducing the Rittman Mead OBIEE Performance Analytics Service

Wed, 2015-10-21 04:30
Fix Your OBIEE Performance Problems Today

OBIEE is a powerful analytics tool that enables your users to make the most of the data in your organisation. Ensuring that expected response times are met is key to driving user uptake and successful user engagement with OBIEE.

Rittman Mead can help diagnose and resolve performance problems on your OBIEE system. Taking a holistic, full-stack view, we can help you deliver the best service to your users. Fast response times enable your users to do more with OBIEE, driving better engagement, higher satisfaction, and greater return on investment. We enable you to :

  • Create a positive user experience
  • Ensure OBIEE returns answers quickly
  • Empower your BI team to identify and resolve performance bottlenecks in real time
Rittman Mead Are The OBIEE Performance Experts

Rittman Mead have many years of experience in the full life cycle of data warehousing and analytical solutions, especially in the Oracle space. We know what it takes to design a good system, and to troubleshoot a problematic one.

We are firm believers in a practical and logical approach to performance analytics and optimisation. Eschewing the drunk man anti-method of ‘tuning’ configuration settings at random, we advocate making a clear diagnosis and baseline of performance problems before changing anything. Once a clear understanding of the situation is established, steps are taken in a controlled manner to implement and validate one change at a time.

Rittman Mead have spoken at conferences, produced videos, and written many blogs specifically on the subject of OBIEE Performance.

Performance Analytics is not a dark art. It is not the blind application of ‘best practices’ or ‘tuning’ configuration settings. It is the logical analysis of performance behaviour to accurately determine the issue(s) present, and the possible remedies for them.

Diagnose and Resolve OBIEE Performance Problems with Confidence

When you sign up for the Rittman Mead OBIEE Performance Analytics Service you get:

  1. On-site consultancy from one of our team of Performance experts, including Mark Rittman (Oracle ACE Director), and Robin Moffatt (Oracle ACE).
  2. A Performance Analysis Report to give you an assessment of the current performance and prioritised list of optimisation suggestions, which we can help you implement.
  3. Use of the Performance Diagnostics Toolkit to measure and analyse the behaviour of your system and correlate any poor response times with the metrics from the server and OBIEE itself.
  4. Training, which is vital for enabling your staff to deliver optimal OBIEE performance. We work with your staff to help them understand the good practices to be looking for in design and diagnostics. Training is based on formal courseware along with workshops based on examples from your OBIEE system where appropriate
Let Us Help You, Today!

Get in touch now to find out how we can help improve your OBIEE system’s performance. We offer a free, no-obligation sample of the Performance Analysis Report, built on YOUR data.

Don’t just call us when performance may already be problematic – we can help you assess your OBIEE system for optimal performance at all stages of the build process. Gaining a clear understanding of the performance profile of your system and any potential issues gives you the confidence and ability to understand any potential risks to the success of your project – before it gets too late.

The post Introducing the Rittman Mead OBIEE Performance Analytics Service appeared first on Rittman Mead Consulting.

Categories: BI & Warehousing