Rittman Mead Consulting

Subscribe to Rittman Mead Consulting feed
Rittman Mead consults, trains, and innovates within the world of Oracle Business Intelligence, data integration, and analytics.
Updated: 14 hours 6 min ago

Data Processing and Enrichment in Spark Streaming with Python and Kafka

Fri, 2017-01-13 10:00

In my previous blog post I introduced Spark Streaming and how it can be used to process 'unbounded' datasets. The example I did was a very basic one - simple counts of inbound tweets and grouping by user. All very good for understanding the framework and not getting bogged down in detail, but ultimately not so useful.

We're going to stay with Twitter as our data source in this post, but we're going to consider a real-world requirement for processing Twitter data with low-latency. Spark Streaming will again be our processing engine, with future posts looking at other possibilities in this area.

Twitter has come a long way from its early days as a SMS-driven "microblogging" site. Nowadays it's used by millions of people to discuss technology, share food tips, and, of course, track the progress of tea-making. But it's also used for more nefarious purposes, including spam, and sharing of links to pirated material. The requirement we had for this proof of concept was to filter tweets for suspected copyright-infringing links in order that further action could be taken.

The environment I'm using is the same as before - Spark 2.0.2 running on Docker with Jupyter Notebooks to develop the code (and this article!). You can download the full notebook here.

The inbound tweets are coming from an Apache Kafka topic. Any matched tweets will be sent to another Kafka topic. The match criteria are:

  • Not a retweet
  • Contains at least one URL
  • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)
  • The Tweet text must match at least two from a predefined list of artists, albums, and tracks. This is necessary to avoid lots of false positives - think of how many music tracks there are out there, with names that are common in English usage ("yesterday" for example). So we must match at least two ("Yesterday" and "Beatles", or "Yesterday" and "Help!").
    • Match terms will take into account common misspellings (Little Mix -> Litle Mix), hashtags (Little Mix -> #LittleMix), etc

We'll also use a separate Kafka topic for audit/debug purposes to inspect any non-matched tweets.

As well as matching the tweet against the above conditions, we will enrich the tweet message body to store the identified artist/album/track to support subsequent downstream processing.

The final part of the requirement is to keep track of the number of inbound tweets, the number of matched vs unmatched, and for those matched, which artists they were for. These counts need to be per batch and over a window of time too.

Getting Started - Prototyping the Processing Code

Before we get into the meat of the streaming code, let's take a step back and look at what we're wanting the code to achieve. From the previous examples we know we can connect to a Kafka topic, pull in tweets, parse them for given fields, and do windowed counts. So far, so easy (or at least, already figured out!). Let's take a look at nub of the requirement here - the text matching.

If we peruse the BBC Radio 1 Charts we can see the popular albums and artists of the moment (Grant me a little nostalgia here; in my day people 'pirated' music from the Radio 1 chart show onto C90 cassettes, trying to get it without the DJ talking over the start and end. Nowadays it's done on a somewhat more technologically advanced basis). Currently it's "Little Mix" with the album "Glory Days". A quick Wikipedia or Amazon search gives us the track listing too:

  1. Shout Out to My Ex
  2. Touch
  3. F.U.
  4. Oops - Little Mix feat. Charlie Puth
  5. You Gotta Not
  6. Down & Dirty
  7. Power
  8. Your Love
  9. Nobody Like You
  10. No More Sad Songs
  11. Private Show
  12. Nothing Else Matters
  13. Beep Beep
  14. Freak
  15. Touch

A quick twitter search for the first track title gives us this tweet - I have no idea if it's legit or not, but it serves as an example for our matching code requirements:

DOWNLOAD MP3: Little Mix – Shout Out To My Ex (CDQ) Track https://t.co/C30c4Fel4u pic.twitter.com/wJjyG4cdjE

— Ngvibes Media (@ngvibes_com) November 3, 2016

Using the Twitter developer API I can retrieve the JSON for this tweet directly. I'm using the excellent Paw tool to do this.

From this we can get the text element:

"text": "DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE",

The obvious approach would be to have a list of match terms, something like:

match_text=("Little Mix","Glory Days","Shout Out to My Ex","Touch","F.U.")

But - we need to make sure we've matched two of the three types of metadata (artist/album/track), so we need to know which it is that we've matched in the text. We also need to handle variations in text for a given match (such as misspellings etc).

What I came up with was this:

filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})

def test_matching(test_string):  
    print 'Input: %s' % test_string
    for f in filters:
        for a in f['match']:
            if a.lower() in test_string.lower():
                print '\tTag: %s / Value: %s\n\t\t(Match string %s)' % (f['tag'],f['value'],a)

We could then take the test string from above and test it:

test_matching('DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE
    Tag: artist / Value: Little Mix
        (Match string Little Mix)
    Tag: track / Value: Shout Out To My Ex
        (Match string Shout Out To My Ex)

as well as making sure that variations in naming were also correctly picked up and tagged:

test_matching('DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE
    Tag: album / Value: Glory Days
        (Match string GloryDays)
    Tag: artist / Value: Little Mix
        (Match string Litel Mixx)
Additional Processing

With the text matching figured out, we also needed to address the other requirements:

  • Not a retweet
  • Contains at least one URL
    • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)
Not a Retweet

In the old days retweets were simply reposting the same tweet with a RT prefix; now it's done as part of the Twitter model and Twitter clients display the original tweet with the retweeter shown. In the background though, the JSON is different from an original tweet (i.e. not a retweet).

Original tweet:

Because we all know how "careful" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate

— George Takei (@GeorgeTakei) January 12, 2017
{
  "created_at": "Thu Jan 12 00:36:22 +0000 2017",
  "id": 819342218611728384,
  "id_str": "819342218611728384",
  "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

Retweet:

{
  "created_at": "Thu Jan 12 14:40:44 +0000 2017",
  "id": 819554713083461632,
  "id_str": "819554713083461632",
  "text": "RT @GeorgeTakei: Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

  "retweeted_status": {
    "created_at": "Thu Jan 12 00:36:22 +0000 2017",
    "id": 819342218611728384,
    "id_str": "819342218611728384",
    "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",
[...]

So retweets have an additional set of elements in the JSON body, under the retweeted_status element. We can pick this out using the get method as seen in this code snippet, where tweet is a Python object created from a json.loads from the JSON of the tweet:

if tweet.get('retweeted_status'):  
    print 'Tweet is a retweet'
else:  
    print 'Tweet is original'
Contains a URL, and URL is not on Whitelist

Twitter are very good to us in the JSON they supply for each tweet. Every possible attribute of the tweet is encoded as elements in the JSON, meaning that we don't have to do any nasty parsing of the tweet text itself. To find out if there are URLs in the tweet, we just check the entities.urls element, and iterate through the array if present.

if not tweet.get('entities'):  
    print 'no entities element'
else:  
    if not tweet.get('entities').get('urls'):
        print 'no entities.urls element'

The URL itself is again provided to us by Twitter as the expanded_url within the urls array, and using the urlsplit library as I did previously we can extract the domain:

for url in tweet['entities']['urls']:

    expanded_url = url['expanded_url']
    domain = urlsplit(expanded_url).netloc

With the domain extracted, we can then compare it to a predefined whitelist so that we don't pick up tweets that are just linking back to sites such as Spotify, iTunes, etc. Here I'm using the Python set type and issubset method to compare the list of domain(s) that I've extracted from the tweet into the url_info list, against the whitelist:

if set(url_info['domain']).issubset(domain_whitelist):  
    print 'All domains whitelisted'
The Stream Processing Bit

With me so far? We've looked at the requirements for what our stream processing needs to do, and worked out the prototype code that will do this. Now we can jump into the actual streaming code. You can see the actual notebook here if you want to try this yourself.

Job control variables
batchIntervalSec=30  
windowIntervalSec=21600 # Six hours  
app_name = 'spark_twitter_enrich_and_count_rm_01e'  
Make Kafka available to Jupyter
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python'  
Import dependencies

As well as Spark libraries, we're also bringing in the KafkaProducer library which will enable us to send messages to Kafka. This is in the kafka-python package. You can install this standalone on your system, or inline as done below.

# Necessary to make Kafka library available to pyspark
os.system("pip install kafka-python")

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
#    Kafka
from kafka import SimpleProducer, KafkaClient  
from kafka import KafkaProducer

#    json parsing
import json  
#    url deconstruction
from urlparse import urlsplit  
#    regex domain handling
import re  
Define values to match
filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})  
Define whitelisted domains
domain_whitelist=[]  
domain_whitelist.append("itun.es")  
domain_whitelist.append("wikipedia.org")  
domain_whitelist.append("twitter.com")  
domain_whitelist.append("instagram.com")  
domain_whitelist.append("medium.com")  
domain_whitelist.append("spotify.com")  
Function: Unshorten shortened URLs (bit.ly etc)
# Source: http://stackoverflow.com/a/4201180/350613
import httplib  
import urlparse

def unshorten_url(url):  
    parsed = urlparse.urlparse(url)
    h = httplib.HTTPConnection(parsed.netloc)
    h.request('HEAD', parsed.path)
    response = h.getresponse()
    if response.status/100 == 3 and response.getheader('Location'):
        return response.getheader('Location')
    else:
        return url
Function: Send messages to Kafka

To inspect the Kafka topics as messages are sent use:

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 --topic twitter_matched2  

N.B. following the Design Patterns for using foreachRDD guide here.

# http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

def send_to_kafka_matched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_matched2', str(json.dumps(record)))

def send_to_kafka_notmatched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_notmatched2', str(record))

def send_to_kafka_err(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_err2', str(record))
Function: Process each tweet

This is the main processing code. It implements all of the logic described in the requirements above. If a processing condition is not met, the function returns a negative code and description of the condition that was not met. Errors are also caught and returned.

You can see a syntax-highlighted version of the code in the notebook here.

def process_tweet(tweet):  
    # Check that there's a URLs in the tweet before going any further
    if tweet.get('retweeted_status'):
        return (-1,'retweet - ignored',tweet)

    if not tweet.get('entities'):
        return (-2,'no entities element',tweet)

    if not tweet.get('entities').get('urls'):
        return (-3,'no entities.urls element',tweet)

    # Collect all the domains linked to in the tweet
    url_info={}
    url_info['domain']=[]
    url_info['primary_domain']=[]
    url_info['full_url']=[]
    try:
        for url in tweet['entities']['urls']:
            try:
                expanded_url = url['expanded_url']
            except Exception, err:
                return (-104,err,tweet)

            # Try to resolve the URL (assumes it's shortened - bit.ly etc)
            try:
                expanded_url = unshorten_url(expanded_url)
            except Exception, err:
                return (-108,err,tweet)

            # Determine the domain
            try:
                domain = urlsplit(expanded_url).netloc
            except Exception, err:
                return (-107,err,tweet)
            try:
                # Extract the 'primary' domain, e.g. www36.foobar.com -> foobar.com
                #
                # This logic is dodgy for UK domains (foo.co.uk, foo.org.uk, etc) 
                # since it truncates to the last two parts of the domain only (co.uk)
                #
                re_result = re.search('(\w+\.\w+)$',domain)
                if re_result:
                    primary_domain = re_result.group(0)
                else:
                    primary_domain = domain
            except Exception, err:
                return (-105,err,tweet)

            try:
                url_info['domain'].append(domain)
                url_info['primary_domain'].append(primary_domain)
                url_info['full_url'].append(expanded_url)
            except Exception, err:
                return (-106,err,tweet)


        # Check domains against the whitelist
        # If every domain found is in the whitelist, we can ignore them
        try:
            if set(url_info['primary_domain']).issubset(domain_whitelist):
                return (-8,'All domains whitelisted',tweet)
        except Exception, err:
            return (-103,err,tweet)

        # Check domains against the blacklist
        # Unless a domain is found, we ignore it
        #Only use this if you have first defined the blacklist!
        #if not set(domain_blacklist).intersection(url_info['primary_domain']):
        #    return (-9,'No blacklisted domains found',tweet)


    except Exception, err:
        return (-102,err,tweet)

    # Parse the tweet text against list of trigger terms
    # --------------------
    # This is rather messy iterative code that maybe can be optimised
    #
    # Because match terms are not just words, it's not enough to break
    #  up the tweet text into words and match against the filter list.
    #  Instead we have to take each filter term and see if it exists
    #  within the tweet text as a whole
    #
    # Using a set instead of list so that duplicates aren't added
    #
    matched=set()
    try:
        for f in filters:
            for a in f['match']:
                tweet_text = tweet['text']
                match_text = a.decode('utf-8')
                if match_text in tweet_text:
                    matched.add((f['tag'],f['value']))
    except Exception, err:
        return (-101,err,tweet)

    #-----
    # Add the discovered metadata into the tweet object that this function will return
    try:
        tweet['enriched']={}
        tweet['enriched']['media_details']={}
        tweet['enriched']['url_details']=url_info
        tweet['enriched']['match_count']=len(matched)
        for match in matched:
            tweet['enriched']['media_details'][match[0]]=match[1]

    except Exception, err:
        return (-100,err,tweet)

    return (len(matched),tweet)
Function: Streaming context definition

This is the function that defines the streaming context. It needs to be a function because we're using windowing and so the streaming context needs to be configured to checkpoint.

As well as processing inbound tweets, it performs counts of:

  • Inbound
  • Outbound, by type (match/no match/error)
  • For matched tweets, top domains and artists

The code is commented inline to explain how it works. You can see a syntax-highlighted version of the code in the notebook here.

def createContext():  
    sc = SparkContext(appName="spark_twitter_enrich_and_count_01")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, batchIntervalSec)

    # Define Kafka Consumer and Producer
    kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', app_name, {'twitter':1})

    ## Get the JSON tweets payload
    ## >>TODO<<  This is very brittle - if the Kafka message retrieved is not valid JSON the whole thing falls over
    tweets_dstream = kafkaStream.map(lambda v: json.loads(v[1]))

    ## -- Inbound Tweet counts
    inbound_batch_cnt = tweets_dstream.count()
    inbound_window_cnt = tweets_dstream.countByWindow(windowIntervalSec,batchIntervalSec)

    ## -- Process
    ## Match tweet to trigger criteria
    processed_tweets = tweets_dstream.\
        map(lambda tweet:process_tweet(tweet))

    ## Send the matched data to Kafka topic
    ## Only treat it as a match if we hit at least two of the three possible matches (artist/track/album)
    ## 
    ## _The first element of the returned object is a count of the number of matches, or a negative 
    ##  to indicate an error or no URL content in the tweet._
    ## 
    ## _We only want to send the actual JSON as the output, so use a `map` to get just this element_

    matched_tweets = processed_tweets.\
                        filter(lambda processed_tweet:processed_tweet[0]>1).\
                        map(lambda processed_tweet:processed_tweet[1])

    matched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_matched))
    matched_batch_cnt = matched_tweets.count()
    matched_window_cnt = matched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Set up counts for matched metadata
    ##-- Artists
    matched_artists = matched_tweets.map(lambda tweet:(tweet['enriched']['media_details']['artist']))

    matched_artists_batch_cnt = matched_artists.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Artist: %s\tCount: %s" % (x[0],x[1]))

    matched_artists_window_cnt = matched_artists.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Artist: %s\tCount: %s" % (x[0],x[1]))

    ##-- Domains
    ## Since url_details.primary_domain is an array, need to flatMap here
    matched_domains = matched_tweets.flatMap(lambda tweet:(tweet['enriched']['url_details']['primary_domain']))

    matched_domains_batch_cnt = matched_domains.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Domain: %s\tCount: %s" % (x[0],x[1]))
    matched_domains_window_cnt = matched_domains.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Domain: %s\tCount: %s" % (x[0],x[1]))

    ## Display non-matches for inspection
    ## 
    ## Codes less than zero but greater than -100 indicate a non-match (e.g. whitelist hit), but not an error

    nonmatched_tweets = processed_tweets.\
        filter(lambda processed_tweet:(-99<=processed_tweet[0]<=1))

    nonmatched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_notmatched))

    nonmatched_batch_cnt = nonmatched_tweets.count()
    nonmatched_window_cnt = nonmatched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ##  Print any erroring tweets
    ## 
    ## Codes less than -100 indicate an error (try...except caught)

    errored_tweets = processed_tweets.\
                        filter(lambda processed_tweet:(processed_tweet[0]<=-100))
    errored_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_err))

    errored_batch_cnt = errored_tweets.count()
    errored_window_cnt = errored_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Print counts
    inbound_batch_cnt.map(lambda x:('Batch/Inbound: %s' % x))\
        .union(matched_batch_cnt.map(lambda x:('Batch/Matched: %s' % x))\
            .union(nonmatched_batch_cnt.map(lambda x:('Batch/Non-Matched: %s' % x))\
                .union(errored_batch_cnt.map(lambda x:('Batch/Errored: %s' % x)))))\
        .pprint()

    inbound_window_cnt.map(lambda x:('Window/Inbound: %s' % x))\
        .union(matched_window_cnt.map(lambda x:('Window/Matched: %s' % x))\
            .union(nonmatched_window_cnt.map(lambda x:('Window/Non-Matched: %s' % x))\
                .union(errored_window_cnt.map(lambda x:('Window/Errored: %s' % x)))))\
        .pprint()

    matched_artists_batch_cnt.pprint()
    matched_artists_window_cnt.pprint()
    matched_domains_batch_cnt.pprint()
    matched_domains_window_cnt.pprint()

    return ssc
Start the streaming context
ssc = StreamingContext.getOrCreate('/tmp/%s' % app_name,lambda: createContext())  
ssc.start()  
ssc.awaitTermination()  
Stream Output Counters

From the stdout of the job we can see the simple counts of inbound and output splits, both per batch and accumulating window:

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Batch/Inbound: 9
Batch/Matched: 0
Batch/Non-Matched: 9
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Window/Inbound: 9
Window/Non-Matched: 9

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Batch/Inbound: 21
Batch/Matched: 0
Batch/Non-Matched: 21
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Window/Inbound: 30
Window/Non-Matched: 30

The details of identified artists within tweets is also tracked, per batch and accumulated over the window period (6 hours, in this example)

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Batch/Artist: Major Lazer       Count: 4
Batch/Artist: David Bowie       Count: 1

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Window/Artist: Major Lazer      Count: 1320
Window/Artist: Drake    Count: 379
Window/Artist: Taylor Swift     Count: 160
Window/Artist: Metallica        Count: 94
Window/Artist: David Bowie      Count: 84
Window/Artist: Lady Gaga        Count: 37
Window/Artist: Pink Floyd       Count: 11
Window/Artist: Kate Bush        Count: 10
Window/Artist: Justice  Count: 9
Window/Artist: The Weeknd       Count: 8
Kafka Output Matched

The matched Kafka topic holds a stream of tweets in JSON format, with the discovered metadata (artist/album/track) added. I'm using the Kafka console consumer to view the contents, parsed through jq to show just the tweet text and metadata that has been added.

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 \  
--topic twitter_matched1 \
--from-beginning|jq -r "[.text,.enriched.url_details.primary_domain[0],.enriched.media_details.artist,.enriched.media_details.album,.enriched.media_details.track,.enriched.match_count] "
[
  "Million Reasons by Lady Gaga - this is horrendous sorry @ladygaga https://t.co/rEtePIy3OT",
  "youtube.com",
  "https://www.youtube.com/watch?v=NvMoctjjdhA&feature=youtu.be",
  "Lady Gaga",
  null,
  "Million Reasons",
  2
]
Non-Matched

On our Kafka topics outbound we can see the non-matched messages. Probably you'd disable this stream once the processing logic was finalised, but it's useful to be able to audit and validate the reasons for non-matches. Here a retweet is ignored, and we can see it's a retweet from the RT prefix of the text field. The -1 is the return code from the process_tweets function denoting a non-match:

(-1, 'retweet - ignored', {u'contributors': None, u'truncated': False, u'text': u'RT @ChartLittleMix: Little Mix adicionou datas para a Summer Shout Out 2017 no Reino Unido https://t.co/G4H6hPwkFm', u'@timestamp': u'2017-01-13T11:45:41.000Z', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 819873048132288513, u'favorite_count': 0, u'source':
Summary

In this article we've built on the foundations of the initial exploration of Spark Streaming on Python, expanding it out to address a real-world processing requirement. Processing unbounded streams of data this way is not as complex as you may think, particularly for the benefits that it can yield in reducing the latencies between an event occuring and taking action from it.

We've not touched on some of the more complex areas, such as scaling this up to multiple Spark nodes, partitioned Kafka topics, and so on - that's another [kafka] topic (sorry...) for another day. The code itself is also rudimentary - before moving it into Production there'd be some serious refactoring and optimisation review to be performed on it.

You can find the notebook for this article here, and the previous article's here.

If you'd like more information on how Rittman Mead can help your business get the most of out its data, please do get in touch!

Categories: BI & Warehousing

Getting Started with Spark Streaming, Python, and Kafka

Thu, 2017-01-12 07:42

Last month I wrote a series of articles in which I looked at the use of Spark for performing data transformation and manipulation. This was in the context of replatforming an existing Oracle-based ETL and datawarehouse solution onto cheaper and more elastic alternatives. The processing that I wrote was very much batch-focussed; read a set of files from block storage ('disk'), process and enrich the data, and write it back to block storage.

In this article I am going to look at Spark Streaming. This is one of several libraries that the Spark platform provides (others include Spark SQL, Spark MLlib, and Spark GraphX). Spark Streaming provides a way of processing "unbounded" data - commonly referred to as "streaming" data. It does this by breaking it up into microbatches, and supporting windowing capabilities for processing across multiple batches. You can read more in the excellent Streaming Programming Guide.

(image src)

Why Stream Processing?

Processing unbounded data sets, or "stream processing", is a new way of looking at what has always been done as batch in the past. Whilst intra-day ETL and frequent batch executions have brought latencies down, they are still independent executions with optional bespoke code in place to handle intra-batch accumulations. With a platform such as Spark Streaming we have a framework that natively supports processing both within-batch and across-batch (windowing).

By taking a stream processing approach we can benefit in several ways. The most obvious is reducing latency between an event occurring and taking an action driven by it, whether automatic or via analytics presented to a human. Other benefits include a more smoothed out resource consumption profile. We can avoid the very 'spiky' demands on CPU/memory/etc every time a batch runs by instead processing the same volume of data processed but in smaller intervals. Finally, given that most data we process is actually unbounded ("life doesn't happen in batches"), designing new systems to be batch driven - with streaming seen as an exception - is actually an anachronism with roots in technology limitations that are rapidly becoming moot. Stream processing doesn't have to imply, or require, "fast data" or "big data". It can just mean processing data continually as it arrives, and not artificially splitting it into batches.

For more details and discussion of streaming in depth and some of its challenges, I would recommend:

Use-Case and Development Environment

So with that case made above for stream processing, I'm actually going to go back to a very modest example. The use-case I'm going to put together is - almost inevitably for a generic unbounded data example - using Twitter, read from an Apache Kafka topic. We'll start simply, counting the number of tweets per user within each batch and doing some very simple string manipulations. After that we'll see how to do the same but over a period of time (windowing). In the next blog we'll extend this further into a more useful example, still based on Twitter but demonstrating how to satisfy some real-world requirements in the processing.

I developed all of this code using Jupyter Notebooks. I've written before about how awesome notebooks are (along with Jupyter, there's Apache Zeppelin). As well as providing a superb development environment in which both the code and the generated results can be seen, Jupyter gives the option to download a Notebook to Markdown. This blog runs on Ghost, which uses Markdown as its native syntax for composing posts - so in fact what you're reading here comes directly from the notebook in which I developed the code. Pretty cool.

If you want can view the notebook online here, and from there download it and run it live on your own Jupyter instance.

I used the docker image all-spark-notebook to provide both Jupyter and the Spark runtime environment. By using Docker I don't have to really worry about provisioning the platform on which I want to develop the code - I can just dive straight in and start coding. As and when I'm ready to deploy the code to a 'real' execution environment (for example EMR), then I can start to worry about that. The only external aspect was an Apache Kafka cluster that I had already, with tweets from the live Twitter feed on an Apache Kafka topic imaginatively called twitter.

To run the code in Jupyter, you can put the cursor in each cell and press Shift-Enter to run it each cell at a time -- or you can use menu option Kernel -> Restart & Run All. When a cell is executing you'll see a [*] next to it, and once the execution is complete this changes to [y] where y is execution step number. Any output from that step will be shown immediately below it.

To run the code standalone, you would download the .py from Jupyter, and execute it from the commandline using:

/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 spark_code.py
Preparing the Environment

We need to make sure that the packages we're going to use are available to Spark. Instead of downloading jar files and worrying about paths, we can instead use the --packages option and specify the group/artifact/version based on what's available on Maven and Spark will handle the downloading. We specify PYSPARK_SUBMIT_ARGS for this to get passed correctly when executing from within Jupyter.

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
Import dependencies

We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka. We also need the python json module for parsing the inbound twitter data

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json  
Create Spark context

The Spark context is the primary object under which everything else is called. The setLogLevel call is optional, but saves a lot of noise on stdout that otherwise can swamp the actual outputs from the job.

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("WARN")  
Create Streaming Context

We pass the Spark context (from above) along with the batch duration which here is set to 60 seconds.

See the API reference and programming guide for more details.

ssc = StreamingContext(sc, 60)  
Connect to Kafka

Using the native Spark Streaming Kafka capabilities, we use the streaming context from above to connect to our Kafka cluster. The topic connected to is twitter, from consumer group spark-streaming. The latter is an arbitrary name that can be changed as required.

For more information see the documentation.

kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})  
Message Processing Parse the inbound message as json

The inbound stream is a DStream, which supports various built-in transformations such as map which is used here to parse the inbound messages from their native JSON format.

Note that this will fail horribly if the inbound message isn't valid JSON.

parsed = kafkaStream.map(lambda v: json.loads(v[1]))  
Count number of tweets in the batch

The DStream object provides native functions to count the number of messages in the batch, and to print them to the output:

We use the map function to add in some text explaining the value printed.

Note that nothing gets written to output from the Spark Streaming context and descendent objects until the Spark Streaming Context is started, which happens later in the code. Also note that pprint by default only prints the first 10 values.

parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()  

If you jump ahead and try to use Windowing at this point, for example to count the number of tweets in the last hour using the countByWindow function, it'll fail. This is because we've not set up the streaming context with a checkpoint directory yet. You'll get the error: java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().. See later on in the blog for details about how to do this.

Extract Author name from each tweet

Tweets come through in a JSON structure, of which you can see an example here. We're going to analyse tweets by author, which is accessible in the JSON structure at user.screen_name.

The lambda anonymous function is used to apply the map to each RDD within the DStream. The result is a DStream holding just the author's screenname for each tweet in the original DStream.

authors_dstream = parsed.map(lambda tweet: tweet['user']['screen_name'])  
Count the number of tweets per author

With our authors DStream, we can now count them using the countByValue function. This is conceptually the same as this quasi-SQL statement:

SELECT   AUTHOR, COUNT(*)
FROM     DSTREAM
GROUP BY AUTHOR

Using countByValue is a more legible way of doing the same thing that you'll see done in tutorials elsewhere with a map / reduceBy.

author_counts = authors_dstream.countByValue()  
author_counts.pprint()  
Sort the author count

If you try and use the sortBy function directly against the DStream you get an error:

'TransformedDStream' object has no attribute 'sortBy'

This is because sort is not a built-in DStream function. Instad we use the transform function to access sortBy from pySpark.

To use sortBy you specify a lambda function to define the sort order. Here we're going to do it based on the number of tweets (index 1 of the RDD) per author. You'll note this index references being used in the sortBy lambda function x[1], negated to reverse the sort order.

Here I'm using \ as line continuation characters to make the code more legible.

author_counts_sorted_dstream = author_counts.transform(\  
  (lambda foo:foo\
   .sortBy(lambda x:( -x[1]))))
author_counts_sorted_dstream.pprint()  
Get top 5 authors by tweet count

To display just the top five authors, based on number of tweets in the batch period, we'll using the take function. My first attempt at this failed with:

AttributeError: 'list' object has no attribute '_jrdd'

Per my woes on StackOverflow a parallelize is necessary to return the values into a DStream form.

top_five_authors = author_counts_sorted_dstream.transform\  
  (lambda rdd:sc.parallelize(rdd.take(5)))
top_five_authors.pprint()  
Get authors with more than one tweet, or whose username starts with 'rm'

Let's get a bit more fancy now - filtering the resulting list of authors to only show the ones who have tweeted more than once in our batch window, or -arbitrarily- whose screenname begins with rm...

filtered_authors = author_counts.filter(lambda x:\  
                                                x[1]>1 \
                                                or \
                                                x[0].lower().startswith('rm'))

We'll print this list of authors matching the criteria, sorted by the number of tweets. Note how the sort is being done inline to the calling of the pprint function. Assigning variables and then pprinting them as I've done above is only done for clarity. It also makes sense if you're going to subsequently reuse the derived stream variable (such as with the author_counts in this code).

filtered_authors.transform\  
  (lambda rdd:rdd\
  .sortBy(lambda x:-x[1]))\
  .pprint()
List the most common words in the tweets

Every example has to have a version of wordcount, right? Here's an all-in-one with line continuations to make it clearer what's going on. Note that whilst it makes for tidier code, it also makes it harder to debug...

parsed.\  
    flatMap(lambda tweet:tweet['text'].split(" "))\
    .countByValue()\
    .transform\
      (lambda rdd:rdd.sortBy(lambda x:-x[1]))\
    .pprint()
Start the streaming context

Having defined the streaming context, now we're ready to actually start it! When you run this cell, the program will start, and you'll see the result of all the pprint functions above appear in the output to this cell below. If you're running it outside of Jupyter (via spark-submit) then you'll see the output on stdout.

ssc.start()  
ssc.awaitTermination()  
-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
Tweets in this batch: 188

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'jenniekmz', 1)
(u'SpamNewton', 1)
(u'ShawtieMac', 1)
(u'niggajorge_2', 1)
(u'agathatochetti', 1)
(u'Tommyguns&#95;&#95;&#95;&#95;&#95;', 1)
(u'zwonderwomanzzz', 1)
(u'Blesschubstin', 1)
(u'Prikes5', 1)
(u'MayaParms', 1)
...

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)
(u'pchrin_', 2)
(u'uslaybieber', 2)
(u'rowblanchsrd', 2)
(u'__Creammy__', 2)
(u'jenniekmz', 1)
...

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)
(u'pchrin_', 2)
(u'uslaybieber', 2)
(u'rowblanchsrd', 2)
(u'__Creammy__', 2)

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RT', 135)
(u'Justin', 61)
(u'Bieber', 59)
(u'on', 41)
(u'a', 32)
(u'&amp;', 32)
(u'Ros\xe9', 31)
(u'Drake', 31)
(u'the', 29)
(u'Love', 28)
...
[...]

You can see the full output from the job in the notebook here.

So there we have it, a very simple Spark Streaming application doing some basic processing against an inbound data stream from Kafka.

Windowed Stream Processing

Now let's have a look at how we can do windowed processing. This is where data is processed based on a 'window' which is a multiple of the batch duration that we worked with above. So instead of counting how many tweets there are every batch (say, 5 seconds), we could instead count how many there are per minute. Here, a minutes (60 seconds) is the window interval. We can perform this count potentially every time the batch runs; how frequently we do the count is known as the slide interval.


Image credit, and more details about window processing, here.

The first thing to do to enable windowed processing in Spark Streaming is to launch the Streaming Context with a checkpoint directory configured. This is used to store information between batches if necessary, and also to recover from failures. You need to rework your code into the pattern shown here. All the code to be executed by the streaming context goes in a function - which makes it less easy to present in a step-by-step form in a notebook as I have above.

Reset the Environment

If you're running this code in the same session as above, first go to the Jupyter Kernel menu and select Restart.

Prepare the environment

These are the same steps as above.

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
from pyspark import SparkContext  
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
import json  
Define the stream processing code
def createContext():  
    sc = SparkContext(appName="PythonSparkStreamingKafka_RM_02")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 5)

    # Define Kafka Consumer
    kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming2', {'twitter':1})

    ## --- Processing
    # Extract tweets
    parsed = kafkaStream.map(lambda v: json.loads(v[1]))

    # Count number of tweets in the batch
    count_this_batch = kafkaStream.count().map(lambda x:('Tweets this batch: %s' % x))

    # Count by windowed time period
    count_windowed = kafkaStream.countByWindow(60,5).map(lambda x:('Tweets total (One minute rolling count): %s' % x))

    # Get authors
    authors_dstream = parsed.map(lambda tweet: tweet['user']['screen_name'])

    # Count each value and number of occurences 
    count_values_this_batch = authors_dstream.countByValue()\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                              .map(lambda x:"Author counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))

    # Count each value and number of occurences in the batch windowed
    count_values_windowed = authors_dstream.countByValueAndWindow(60,5)\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                            .map(lambda x:"Author counts (One minute rolling):\tValue %s\tCount %s" % (x[0],x[1]))

    # Write total tweet counts to stdout
    # Done with a union here instead of two separate pprint statements just to make it cleaner to display
    count_this_batch.union(count_windowed).pprint()

    # Write tweet author counts to stdout
    count_values_this_batch.pprint(5)
    count_values_windowed.pprint(5)

    return ssc
Launch the stream processing

This uses local disk to store the checkpoint data. In a Production deployment this would be on resilient storage such as HDFS.

Note that, by design, if you restart this code using the same checkpoint folder, it will execute the previous code - so if you need to amend the code being executed, specify a different checkpoint folder.

ssc = StreamingContext.getOrCreate('/tmp/checkpoint_v01',lambda: createContext())  
ssc.start()  
ssc.awaitTermination()  
-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Tweets this batch: 782
Tweets total (One minute rolling count): 782

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts this batch:    Value AnnaSabryan   Count 8
Author counts this batch:    Value KHALILSAFADO  Count 7
Author counts this batch:    Value socialvidpress    Count 6
Author counts this batch:    Value SabSad_   Count 5
Author counts this batch:    Value CooleeBravo   Count 5
...

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 8
Author counts (One minute rolling):    Value KHALILSAFADO  Count 7
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5
...

[...]

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Tweets this batch: 5
Tweets total (One minute rolling count): 245

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Author counts this batch:    Value NowOnFR   Count 1
Author counts this batch:    Value IKeepIt2000   Count 1
Author counts this batch:    Value PCH_Intl  Count 1
Author counts this batch:    Value ___GlBBS  Count 1
Author counts this batch:    Value lauracoutinho24   Count 1

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Author counts (One minute rolling):    Value OdaSethre Count 3
Author counts (One minute rolling):    Value CooleeBravo   Count 2
Author counts (One minute rolling):    Value ArrezinaR Count 2
Author counts (One minute rolling):    Value blackpinkkot4 Count 2
Author counts (One minute rolling):    Value mat_lucidream Count 1
...

You can see the full output from the job in the notebook here. Let's take some extracts and walk through them.

Total tweet counts

First, the total tweet counts. In the first slide window, they're the same, since we only have one batch of data so far:

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Tweets this batch: 782
Tweets total (One minute rolling count): 782 

Five seconds later, we have 25 tweets in the current batch - giving us a total of 807 (782 + 25):

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Tweets this batch: 25
Tweets total (One minute rolling count): 807 

Fast forward just over a minute and we see that the windowed count for a minute is not just going up - in some cases it goes down - since our window is now not simply the full duration of the inbound data stream, but is shifting along and giving a total count for the last 60 seconds only.

-------------------------------------------
Time: 2017-01-11 17:09:50
-------------------------------------------
Tweets this batch: 28
Tweets total (One minute rolling count): 1012

-------------------------------------------
Time: 2017-01-11 17:09:55
-------------------------------------------
Tweets this batch: 24
Tweets total (One minute rolling count): 254
Count by Author

In the first batch, as with the total tweets, the batch tally is the same as the windowed one:

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts this batch:    Value AnnaSabryan   Count 8
Author counts this batch:    Value KHALILSAFADO  Count 7
Author counts this batch:    Value socialvidpress    Count 6
Author counts this batch:    Value SabSad_   Count 5
Author counts this batch:    Value CooleeBravo   Count 5
...

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 8
Author counts (One minute rolling):    Value KHALILSAFADO  Count 7
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5    

But notice in subsequent batches the rolling totals are accumulating for each author. Here we can see KHALILSAFADO (with a previous rolling total of 7, as above) has another tweet in this batch, giving a rolling total of 8:

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Author counts this batch:    Value DawnExperience    Count 1
Author counts this batch:    Value KHALILSAFADO  Count 1
Author counts this batch:    Value Alchemister5  Count 1
Author counts this batch:    Value uused2callme  Count 1
Author counts this batch:    Value comfyjongin   Count 1
...

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 9
Author counts (One minute rolling):    Value KHALILSAFADO  Count 8
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5
Summary

What I've put together is a very rudimentary example, simply to get started with the concepts. In the examples in this article I used Spark Streaming because of its native support for Python, and the previous work I'd done with Spark. Jupyter Notebooks are a fantastic environment in which to prototype code, and for a local environment providing both Jupyter and Spark it all you can't beat the Docker image all-spark-notebook.

There are other stream processing frameworks and languages out there, including Apache Flink, Kafka Streams, and Apache Beam, to name but three. Apache Storm and Apache Samza are also relevant, but whilst were early to the party seem to crop up less frequently in stream processing discussions and literature nowadays.

In the next blog we'll see how to extend this Spark Streaming further with processing that includes:

  • Matching tweet contents to predefined list of filter terms, and filtering out retweets
  • Including only tweets that include URLs, and comparing those URLs to a whitelist of domains
  • Sending tweets matching a given condition to a Kafka topic
  • Keeping a tally of tweet counts per batch and over a longer period of time, as well as counts for terms matched within the tweets
Categories: BI & Warehousing

Web-Based RPD Upload and Download for OBIEE 12c

Wed, 2017-01-11 06:15

I was among the people who were dancing and singing after finding out some of the OBIEE 12c new features. The feature I liked the most was a scripted deploy of an RPD file from a developer’s computer. I hate to make dozens of clicks for every deploy of an RPD in 11g. You may object and say that there is WLST in 11g which can do the same and even more. Well, you are right. Except for one thing: WLST is a server-side thing. Information security folk don’t like to give direct access to a server to OBIEE developers. And not every developer is capable of using it.

In OBIEE 12c the only way to upload and download RPDs from a developer’s local machine to the OBIEE server is through the command line. We’re big fans of the command-line approach because it enables automation, reduces the risk of error, and so on. But not all people like a script everything approach as we do. Many of OBIEE developers don’t like to use a command line to do what they used to do with their mouse for years. And today we have a solution for them!

Disclaimer. Everything below is a result of our investigation. It’s not a supported functionality or Oracle’s recommendation. It makes use of undocumented web services that Oracle may remove or change at any time.

Some time ago Robin Moffatt lifted the lid on OBIEE 12c Web Services. He found out how to use curl to do the same things Oracle does with their data-model-cmd (datamodel now) script. But that was purely for geek interest and intended to give us more understanding of what's going on inside of the OBIEE, not give us a new tool. So the next obvious step was to make a user-friendly interface over that web services so any OBIEE developer could utilise this sacred knowledge.

The Simplest Sample

Modern computer technologies offer us a lot of tools to build GUIs, but we wanted to keep it as simple as possible and because OBIEE’s front end is web-based, use of HTML for our RPD tool was the obvious choice too.

Download

Let's start with RPD download.
Here is the curl script to call OBIEE web service and get RPD file.

curl -X "POST" "http://192.168.0.66:7780/bi-lcm/v1/si/ssi/rpd/downloadrpd" \  
--data-urlencode "target-password=Admin123" \
--basic --user weblogic:Admin123 \
> downloadrpd.rpd

an animated gif of curl download

As you can see it's pretty simple. We send a message to http://<host>:<port>/bi-lcm/v1/si/ssi/rpd/downloadrpd using POST method. As a parameter, we send a password to set to the downloaded RPD file (target-password=Admin123) and authentication information (weblogic:Admin123). As a result, we get bytes of the RPD which we redirect to the downloadrpd.rpd file. And now we want a GUI for this script. Actually, Robin already did it.

<html>  
   <body>
      <FORM action="http://192.168.0.66:7780/bi-lcm/v1/si/ssi/rpd/downloadrpd"
         method="post" target=_blank>
         <P>
           New password for downloaded RPD file? <INPUT type="password" name="target-password"><BR>
            <INPUT type="submit" value="Send"> <INPUT type="reset">
      </FORM>
   </body>
</html>  

pic of a local download HTML form

This is not a snippet of code you somehow should incorporate into your system. No. That's almost complete GUI for RPD download! The only thing you need to do is to change hostname and port to match your system. That's all. Simply create an HTML file, put this code into it, change host and port, open with a browser, Enjoy!

This form has no field for authentication because OBIEE server will ask us for login and password at the first call and will maintain this session later.

Upload

The upload was a little bit more tricky from the curl side. Let's take a look at the script:

curl -X POST \  
     "http://192.168.0.66:7780/bi-lcm/v1/si/ssi/rpd/uploadrpd" \
     --form "file=@sample.rpd;type=application/vnd.oracle.rpd" \
     --form "rpd-password=Admin123" \
     --basic --user weblogic:Admin123

an animated gif of curl upload

Here we call another service to upload our file. Our parameters are:

  1. sample.rpd - the RPD file to upload
  2. type=application/vnd.oracle.rpd - MIME type of this file (that was the main trick).
  3. rpd-password=Admin123 - the password of sample.rpd
  4. weblogic:Admin123 - information for authentication.

But the GUI for this task is surprisingly simple. I expected it to be monstrous with lots of JS but in fact, it is small and easy. The minimum viable version is almost as simple as the upload one.

<html>  
   <body>
      <FORM action="http://192.168.0.66:9502/bi-lcm/v1/si/ssi/rpd/uploadrpd"
         method="post" enctype="multipart/form-data" target=_blank>
           File to upload <INPUT type=file name="file"><BR>
           Password for uploaded RPD file? <INPUT type="password" name="rpd-password"><BR>
           <INPUT type="submit" value="Send"> <INPUT type="reset">
      </FORM>
   </body>
</html>  

pic of a local upload HTML form

The use of this piece of code is exactly the same as for download. Simply put it into an HTML file, change host and port. Use it.

Keep in mind that for the both forms field names are fixed and shouldn't be changed. For example, the field for a file to upload should have name "file" and for a password - "rpd-password". Without it, magic won't work.

But there is a thing about this part that we could still improve. Depending on the browser you use it shows the response message either in the same window or downloads it as a text file. And this message is a JSON file.

In real life, this message is a one line JSON but here it is a more human-readable formatted with jq and slightly polished by hands.

{
  "clazz": ["rpd-response"],
  "links": [
    {
      "href": "http://192.168.0.66:7780/bi-lcm/v1/si/ssi/rpd/uploadrpd",
      "rel": ["self"]
    }
  ],
  "properties": {
    "entry": [
      {
        "key": "si",
        "value": {
          "type": "string",
          "value": "ssi"
        }
      },
      {
        "key": "description",
        "value": {
          "type": "string",
          "value": "RPD upload completed successfully."
        }
      },
      {
        "key": "desc_code",
        "value": {
          "type": "string",
          "value": "DESC_CODE_RPD_UPLOAD_SUCCESSFUL"
        }
      },
      {
        "key": "status",
        "value": {
          "type": "string",
          "value": "SUCCESS"
        }
      }
    ]
  },
  "title": "RPD-LCM response, SI=ssi, action=Upload RPD"
}

As you can see here, we have "description" field which holds a human readable message, "desc_code" field is the same but more suitable for automated processing and "status" field which is the first candidate to be used in automatic procedures.

It's easy to read this file but most of the time you'd prefer a simple "Success" message, right?

Going Further

These HTML forms do the trick. A developer can now download and upload RPD file easily with a minimum of clicks and without a need to learn a command-line interface. Security is managed by Weblogic server. Sounds good, right? But we can do it even better. From my point of view absolutely necessary improvements are:

  1. Add some JS to make diagnostics more user-friendly.
  2. Put these forms to a server so every developer in an organisation can use them.
Adding Some JavaScript Magic

My intent from the very beginning was to keep things as simple as possible. I’m not sure that this time my choice of JavaScript library (JQuery) was the simplest for this task, but anyways the code I have to write is very small so I like it.

<html>  
<head>  
<script src="./jquery-3.1.1.min.js"></script>  
</head>

<body>

<script>  
$(document).ready(function(){
$("#upload").on('submit', function( e ) {
        e.preventDefault();
    $.ajax( {
        url: $(this).attr('action'),
        type: 'POST',
        data: new FormData( this ),
        processData: false,
        contentType: false,
        dataFilter: function (data, type){$("#response").html(JSON.parse(data).properties.entry[1].value.value);$("#response").append('<details>'+data+'</details>');}
    });
});
});
</script>

<FORM id=upload action="http://192.168.0.136:9502/bi-lcm/v1/si/ssi/rpd/uploadrpd"  
         method="post" enctype="multipart/form-data">
           File to upload <INPUT type=file name="file"><BR>
           Password for uploaded RPD file? <INPUT type="password" name="rpd-password"><BR>
           <INPUT type="submit" value="Send"> <INPUT type="reset">
</FORM>

<div id=response></div>

</body>  
</html>  

The script reads the form and sends it content to the server, then it reads the answer, parses it and shows in a user-friendly way. Note that it does need the Jquery library to work. The problem with this code is that it won't work locally. If you try to use it in the same way as previous samples it won't do anything. But if we take a look at the developer console of the browser we immediately find the answer. OBIEE blocks my cross-domain JavaScript call.

a pic of developer console

That could become a problem but I was going to put these files on a server anyway so that all developers could access it.

Deploying It to a Server

What I want to do now is to put my forms to some place accessible with a browser from a server where OBIEE works. To achieve that I should do a few steps.

  1. Create a directory on the server.
  2. Put my files to that directory.
  3. Expose the directory with a web server.

There are no special requirements for the place for a directory I will create. It just should be accessible by a web server (Weblogic). I prefer to keep all user content in one place so my choice is to place it somewhere inside $ORACLEHOME/userprojects.

a pic of location

But there is one special requirement for the directory content. It should have a subdirectory WEB-INF with web.xml file inside.

a pic of web.xml

For my current purposes, almost empty web.xml is just fine. That may be not the best option for the real life but I'm trying to keep things simple, remember?

<?xml version="1.0" encoding="UTF-8"?>  
<web-app xmlns="http://java.sun.com/xml/ns/javaee"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
    version="2.5"/>

I combined both download and upload forms into one rpdtools.html file and added some styling for a nicer look. From the functional point of view, these forms and script were not changed. And then I put this combined file and Jquery library into my "static" directory.

a pic with directory contents

Now everything is ready for the final step. I need to deploy this directory to Weblogic server so the users can access it with a browser.

Login to Weblogic console and deploy static folder as an application.
a pic of wls console

For more details on deploying folders to WLS, see the Official documentation on web.xml and Deploying Using Shared Folders.

And now the most exciting part of the process. Witness the power of this fully operational battle station! I mean RPD tools.
a pic of the final result

Summary

We showed here a very simple way of adding a web-based GUI for uploading and downloading RPD to any OBIEE 12c system. You can take it and adjust to suit your needs and be a useful day-to-day tool. Deploying this code to a server allows to give an access to it to all OBIEE developers in an organisation and add some cool JavaScript effects. But keep in mind that it uses non-documented services and is not supported by Oracle in any way. This means that it can stop working after an upgrade. Well, in that case, we'll have to invent something new for you.

In case you want to play with this tool, here is a link to our GitHub obi-web-rpd-tools.

Categories: BI & Warehousing

Python for Analytics - Exploring Data with Pandas

Thu, 2016-12-22 12:24
A Crack Team!

At Rittman Mead, we're always encouraged to branch out and pursue new skills in the field in an effort to improve upon our skill sets, and as a result, become more technically fluent. One of the great things about working here, aside from the previous, is that while we all have a really solid foundation in Oracle technologies, there are many who possess an incredibly diverse range of skills, fostered by years of working in tech-agnostic engagements. It's a great feeling to know that if you ever run up against some sort of bizarre challenge, or have to integrate some piece of arcane tech into an architectural solution, more than likely, someone, somewhere within Rittman Mead has had to do it. It is this freedom to pursue, within reason of course, other technical exploits that has nurtured a real spirit of innovation around the company within the past year. Our GitHub is overflowing with open source visualizations and performance monitoring and maintenance scripts that are simply there for the taking! We've put a lot of time into developing this stuff so our clients and partners don't have to.

Python

python logo But I digress. This blog is about Python, and well, I haven't really mentioned it up until this point. It is in this spirit of innovation, learning, and frankly, automating the boring stuff, however, that a lot of us have been pursuing automation and analytical endeavors using the Python language. It has been touted by many as THE language for data science, and rightfully so, given its accessibility and healthy selection of libraries perfectly suited to the task, such as NumPy, Seaborn, Pandas, Matplotlib. In today's exercise, we're going to walk through common data munging, transformation, and visualization tasks using some of these libraries in order to deliver quick insights into a data set that's near and dear to my heart, Game of Thrones battles and character deaths!

GoT Logo

Through this process, we will be creating our own data narrative that will help to expound upon the idle numbers and labels of the data set. We'll see that the process is less a hard and fast, rigid, set of rules for which to approach data exploration, and something more akin to solving a crime, clue by clue, letting the data tell the story.

PYTHON FOR DATA SCIENCE

Aside from its myriad, community driven and maintained libraries, the greatest thing, to me anyway, about Python is its relatively low barrier to entry. Even with little to no previous programming skills, an enterprising lad or lady can get up and running, performing basic, functional programming tasks in no time. You'll be amazed at how quickly you'll start coming up with daily tasks that you can throw a bit (or a lot) of Python at. Today, we'll be tackling some tasks like these, common to the everyday processes of data analysis and data science. Utilizing the Pandas library, in addition to a few others, we'll see how we can programmatically go from question to answer in no time, and with most any structured or unstructured data set. The primary audience of this blog will be those with a bit of Python fluency, in addition to those with an interest in data science and analytics. I will be explaining the steps and providing a Jupyter notebook (link here) for those who wish to follow along, however, for those who might need the extra guidance. So don't bail now! Let's get to it. In this instance, we'll be downloading the Game of Thrones data set from kaggle, a great site that provides open data sets and accompanying analysis projects in multiple languages. Download the data set if you'd like to follow along.

GETTING STARTED

stepping stones Let's begin by taking some steps to get our heads on straight and carve out a clear work flow. I find this is really helpful, especially in programming and/or analytical scenarios where one can begin to suffer from "analysis paralysis". So, at a high level, we'll be doing the following:

  1. First, we'll take a cursory look at the Python libraries we'll be incorporating into our data sleuthing exercise, how they're used, and some examples of their output and ideal use cases.

  2. Next we'll use the tools in these libraries to take a deeper dive into our data set and start to construct our initial line of questioning. This is where we get to be a bit creative in coming up with how we're going to wrap our heads around the data, and what kind of questions we're going to throw at it.

  3. We'll then chase down any leads, incorporating additional analyses where necessary, and begin to construct a narrative about our data set. At this point we'll be formulating hypotheses and attempting to construct visualizations that will help us to further or disprove our investigation.

PANDAS IN THE JUNGLE

Any great detective must always have with them a toolkit with which to thoroughly examine any crime scene, and that's essentially what we have in the Pandas, Seaborn, and Numpy ("num-pie") libraries for the Python programming language. They provide a set of methods (functions) that can take an input, or a number of inputs, do some magic, and then provide us with lots of really useful information. So let's begin by examining these libraries and what we can do with each.

Pandas and Numpy

pands_logo

Pandas is great at doing a bunch of really common tasks related to data exploration, not limited to, indexing and selection, merging and joining data sets, grouping and aggregations, and visualizing data. This will be the library with which we'll be doing a lot of the heavy lifting. Pandas also provides us with the Dataframe object that greatly expands on the comparatively more rigid Numpy's ndarray object. These 'objects' are simply containers that hold data of some kind, and allow us to interact on that data.

Matplotlib and Seaborn

matplot_logo

Matplotlib is a robust visualization library built to enable interactive, MATLAB style plotting on most any platform or back-end. This library, along with Seaborn, should be your go-to for producing super malleable graphs and visualizations. Working alongside matplotlib, seaborn pitches itself as a go-to for statistical based visualizations, but also supports complex, grid and algorithm based charts as well. Both of these libraries will help us to make quick and insightful decisions about our data, and help us to gather evidence further supporting, or disproving and hypotheses we might form.

THE INVESTIGATION

desk

Now that we've armed ourselves with the tools we need to thoroughly examine any potential data set we're given, let's do just that! Opening up the Game of Thrones csv files from above, let's first take a look at what kinds of information we have. The basic stats are as follows:

Synopsis

  • Battles - a complete listing of the battles in the book series and their stats! Attacker, defender, army size, you name it, we've got it.

  • Character Deaths - something the series/show is quite known for, who died? This contains some great info, such as allegiance and nobility status.

  • Character Predictions - The more morbid of the lot, this data set lists predictions on which character will die. We won't be using this sheet for our exercise.

A Hypothesis of Thrones

Having just finished the monumental series myself, you could say that at this point I'm somewhat of a subject matter expert; that at this point, we have a situation not unlike that which you might find in any organization. We've got an interested party that wants to look further into some aspect of their data. We can use our investigatory tool-set to get real results and gain some valuable and informative insights. As subject matter experts though, we should ideally be coming at our data with at least some semblance of a hypothesis, or something that we're trying to prove using our data (or disprove for that matter). For the sake of this exercise, and fitting in with the theme of the data, I'm going to try and dig up an answer to the following:

Does House Lannister, for as evil and scheming as they are, and as much as they get away with, eventually get what's coming to them?

No lannisters!

As much as I'd like to believe it's true, however, we're going to need to run the numbers, and let our data do the talking.

Importing the Data

You can follow along in the Jupyter notebook here now. Working with our Pandas library, we first need to get our data into some sort of workable object. As we stated before, this is the data frame. It is simply a table type object that is really good at handling empty values and data of many different types. We can easily perform operations on these objects and visualize them with minimal fuss. So, enough talk. Let's do it!

Working in your favorite IDE (Pycharm is easy to use and comes in a free version), we start a new project, import the libraries we need, and then drop in our first piece of code. This is the section that imports our csv data set and then converts it to data frame. So, now that we have our object, what do we do with it?

dataframe

A Graph Has No Name

Now that we have our data frame object, we can begin to throw some code at it, crunch some numbers, and see if, in fact, the Lannisters really did get what was coming to them by the end of book 5. Starting with the battles data set, let's see how they fared in the field through the arc of the story. Did they lose more or less troops comparatively? We can do this easily by breaking our data frame into smaller, more manageable chunks, and then graph these data points, accordingly. We are going to use the data set to build a step by step, set of analyses that examines the Lannister victories and defeats throughout the story.

Battle / Troop Loss Over Time

Did the Lannisters hit a losing streak, or did they do well throughout the story? Did they win or lose more of their battles over time?

  1. Start with new data frame based on house and troop sizes:
    troop sizes

  2. Filter to get new results (Lannisters only):

Lannister Troops

Right away we see we have some data issues, that is, there are some holes in the attacker size column. The good thing is that we can more or less look at this small table and get the all the info we need from it right away. The numbers drop down significantly through the years, and that's all there really is to it. But, was this in fact, because they lost more troops, or simply threw less at the problem as they began to carve out their claim to the kingdom? This analysis is not very telling. We're going to need to do some digging elsewhere to answer our question. Let's do some comparisons.

% of Battles Won / Lost

So how did the Lannisters do in the field? Of the 8 battles they fought in, how many did they win? How does this compare with the other armies of Seven Kingdoms?

As we did before, lets get a new data frame together, and then do our grunt work on it to try and answer these questions. Grabbing the columns we need, let's run the numbers on how the Lannisters stack up against the other houses of Westeros in the field.

How many battles did they fight compared to the other Houses?

battles fought

How many did they actually win?

won-to-total

We can see right away, that out of all the battles they fought throughout the series (which is decidedly more than the other houses in the series), that they came out on top. Could the Lannisters be the dominating force on the field, as well as at court? The Starks are the only house that meet them conflict for conflict, and the Lannisters still reign supreme! Let's take things down to a finer grain and see how those who aligned themselves with the Lion did compared to those who didn't.

Death by Allegiance

Opening up our character deaths file, right away we see we have some pretty good info here. We have a laundry list of characters, their death year, and the house, if any, to which they were aligned. Let's start by building a data frame, and first, filtering out those who are unaligned, in the Night's Watch, or a Wildling. We want to get a comparison between houses, and these groups will just muck up the works. Let's do the numbers. We can now plot this info on a basic bar chart to get a basic rundown of the massacre.

death by alignment

Things are starting to look up...depending on your point of view, I guess. The Lannisters, for all their dirty business, do seem to, in fact, lose the most named characters tied to their house. Of these, let's see how many were actually nobility, or rather, the most influential in furthering their cause!

noble deaths to total

It would seem our Lannisters aren't too good at keeping their hands clean, and letting those of lesser station do their dirty work for them. Although they have the second most aligned character deaths in the series, roughly 75% of them are Noble deaths, meaning that people important to their cause are dying! The only other houses that come close unfortunately, are the Starks (the Red Wedding, no doubt), and the Greyjoys. What this also means, however, is that our claim is gathering more support; the Lannisters may have climbed the royal ladder, but at what cost?

Paying Your Debts

death_prop

We can see from the donut chart above (excuse the repetition of colors) that indeed, the Lannisters have one of the highest % to total death numbers out of all the major houses in the Seven Kingdoms. This actually goes quite a long way in backing up our hypothesis; that of all the named characters in the series, the Lannisters lost the lion's share (pun intended). The disconcerting thing is that they either seem to bring down many others with them, or the other noble houses aren't terribly great at keeping themselves among the living either.

Conclusion

Are these figures, combined with their high noble of ranking noble deaths enough to satisfy my desire for vengeance? Did they truly reap what they have sown? I have to say I am ultimately undecided in the matter, as, although they did lose a great many, they in turn took a a greater number down along with them. It seems that despite these losses, any notion of vengeful satisfaction must be tempered by this fact; that although the Lannisters did end up getting hit pretty hard with significant losses, this is bittersweet when compared to the real and lasting damage they did throughout the span of the book's and show's history. Were you able to come up with any additional evidence for or against my case? Link out and show us! Thanks for reading.

Categories: BI & Warehousing

ETL Offload with Spark and Amazon EMR - Part 5 - Summary

Tue, 2016-12-20 07:30

This is the final article in a series documenting an exercise that we undertook for a client recently. Currently running an Oracle-based datawarehouse platform, the client asked for our help in understanding what a future ETL and reporting platform could look like, given the current landscape of tools available. You can read the background to the project, how we developed prototype code, deployed it to Amazon, and evaluated tools for analysing the data.

Whilst the client were generally aware of new technologies, they wanted a clear understanding of what these looked like in practice. Is it viable, as is being touted, to offload ETL entirely to open-source tools? Could they do this, without increasing their licensing costs?

The client are already well adopted to newer technologies, running their entire infrastructure on the Amazon Web Services (AWS) cloud. Given this usage of AWS, our investigation was based around deployment of the Elastic Map Reduce (EMR) Hadoop platform. Many of the findings made during the investigation are as applicable to other Hadoop platforms though, including CDH running on Oracle's Big Data Appliance.

We isolated a single process within the broader part of the client's processing estate for exploration. The point of our study was not less to implement this specific piece of functionality in the most optimal way, but to understand how in general processing would look on another platform in an end-to-end flow. Before any kind of deployment into Production of this design there would be further iterations, particularly around performance. These are discussed further below.

Overview of the Solution

The source data landed in Amazon S3 (similar in concept to HDFS), in CSV format, once per hour. We loaded each file, processed it to enrich it with reference data, and wrote it back to S3.

The enriched data was queried directly, with Presto, and also loaded into Redshift for querying there.

Oracle's Data Visualization Desktop was used as the front end for querying.

Benefits Cost Benefits
  • By moving ETL processing to Hadoop-based platform, we free up capacity (and potentially licensing costs) on the existing commerical RDBMS (Oracle) where the processing currently takes place
  • Costs are further reduced by the 'elastic' provisioning and cost model of the cloud service. You only pay for the size of the cluster necessary for your workload, for the duration that it took to execute.
Technology Benefits

In this solution we have taken advantage of the decoupling of storage from compute. This is a significant advantage that cloud technology brings.

  • Amazon S3 provides the durable data store for our data (whether CSV, Parquet, or any other data format). With S3 you simply pay for the storage that you use. S3 can be accessed by dozens of client libraries as well as HDFS-compatible APIs. Data in S3 is completely compute-target agnostic. Contrast this to data sat in your proprietory RDBMS database, and if you want to process or analyse this in another system.
  • In this instance we wanted to enrich the data, and proved Spark as an appropriate tool to do so. Running on Elastic Map Reduce we could provision this automagically, run our processing, and have the EMR cluster terminate itself once complete. The compute part of the equation is entirely isolated, and can be switched in and our of the architecture as required.

Moving existing workloads to the cloud is not just a case of provisioning servers running in someone else's data centre to perform the same work as before. To truly benefit (dare I say, leverage) from the new possibilities, it makes sense to re-architect how you store your data and perform processing on it.

As well as the benefit of cloud technology, we can see that we don't even need an RDBMS for much of this enrichment and transformation work. Redshift has proved to be useful for interactive analysis of the data, but the processing of the data that would typically get done within an RDBMS (with associated license costs) can instead be done on technology such as Spark.

Broader Observations

The world of data and analytics is changing, and there are some interesting points that this project raised, which I discuss below.

Cloud

The client for whom we carried out this work are already cloud 'converts', running their entire operation on AWS already. They're not alone in recognising the benefits of Cloud, and it's going to be interesting to see the rate at which adoption continues to occur elsewhere, particularly in the Oracle market as they ramp up their offerings.

Cloud Overview

The Cloud is of course a big deal nowadays, whether in the breathless excitement of marketing talk, or the more gradual realisation amongst more technical folk that The Cloud brings some serious benefits. There are three broad flavours of Cloud - Infrastructure, Platform, and Software (IaaS, PaaS, SaaS respectively):

  • At the lowest level, you basically rent access to tin (hardware). Infrastructure-as-a-service (IaaS) can include simply running virtual machines on someone else's hardware, but it's more clever than that. You get the ability to provision storage separately from compute, and all with virtualised networking too. Thus you store your data, but don't pay for the processing until you want to. This is a very long way from working out how big a server to order for installation in your data centre (or indeed, a VM to provision in the cloud) - how many CPUs, how much RAM, how big the hard disks should be - and worrying about under- or over-provisioning it.

    With IaaS the components can be decoupled, and scaled elastically as required. You pay for what you use.

    The additional benefit of IaaS is that someone else manages the actual hardware; machine outages, disk failures, and so on, are all someone else's concern.

  • IaaS can sometimes still be a lot of work; after all, you still have the manage the servers, or architect and manage the decoupled components such as storage and compute. The next 'aaS' up in Platform as a Service (PaaS). Here, the "platform" is provided and managed for you.

    A clear example of PaaS is the Hadoop platform. You can run a Hadoop cluster yourself, whether on Oracle's Big Data Appliance (BDA), or maybe on your own hardware (or indeed, on IaaS in the cloud) but with a distribution such as Cloudera's CDH. Point being, you still have to manage it, to tune your Hadoop parameters, and so on. Hadoop as a platform in the cloud (i.e. PaaS) is offered by many companies, including the big vendors, such as Oracle (Big Data Cloud Service), Microsoft (HDInsight), Google (Dataproc) -- and then the daddy of them all, Amazon with it's Elastic Map Reduce (EMR) platform

    Another example of PaaS is Oracle's BI Cloud Service (BICS), in which you build and run your own RPD and reports, but Oracle look after the actual server processes.

  • Software as a Service (SaaS) is where everything is provisioned and managed for you. Whereas on PaaS you still write the code that's to be run (whether a Spark routine on Hadoop, or BI metadata model on BICS), on SaaS someone has already done that too. You just provide the inputs, which obviously depend on the purpose of the SaaS. Something like GMail is a good example of SaaS. You're not having to write the web-based email, you're not having to provision the servers on which to run that - you simply utilise the software.

Cloud's Benefit to Analytics

Cloud brings benefits - but also greater subtleties to our solutions. Instead of simply provisioning one or more servers on which to hold our data and process it, we start to unpick this into separate components. In the context of this study, we have:

  • Data at rest, on S3. This is storage paid for simply based on how much you use. Importantly, you don't have to have a server (or in more abstract terms, 'compute') running. It's roughly analogous to network mounted storage. You can access S3 externally to AWS, such as your laptop or a server in your data centre. You can also access it, obviously, from within the AWS ecosystem. You can even use S3 to serve up files just as a web server would.
  • Compute, on EMR. How often do you need to carry out transformations and processing on your data? Not continuously? Then why pay for a server to sit idle the rest of the time? What about the size of the server that it does run on - how many CPUs do you need? How many nodes in your cluster? EMR solves both these problems, by enabling you to provision a Hadoop cluster of any size and spec, on demand - and optionally, terminate itself once it's completed its work so that you only pay for the compute time necessary.
  • Having a bunch of data sat around isn't going to bring any value to the business, without Analytics and a way of presenting that to the user. This could be done either through loading the data into a traditional RDBMS such as Oracle, or Redshift, and analysing it there - or potentially through one of the new generation of "SQL on Hadoop" engines, such as Impala or Presto. There's also Athena which is a SQL interface directly to data in S3 - you don't even need to be running a Hadoop cluster to use this.
Innovation vs Execution (or, just because you can, doesn't mean you should)

The code written during this exercise could, with a bit of tidying up, be run in Production. As in, it does the job that it was built to do. We could even expand it to audit row counts in and out, report duff data, send notifications when complete. What about the next processing requirement that comes in? More bespoke code? And more? At some point we'd probably end up refactoring a whole bunch of it into some kind of framework. Into that framework we'd obviously want good things like handling SCDs, data lineage, and more. Welcome to re-inventing the in-house ETL wheel. Whether Spark jobs nowadays, PL/SQL ten years ago, or COBOL routines a decade before that - doing data processing at a wider scale soon becomes a challenge. Even with the best coders (or 'engineers' as they're now called) in the world, you're going to end up with a bespoke platform that's relient on inhouse skills to support and maintain. That presumes of course that you can find the relevant skills in the market to write all the processing and transformations that you need - and support them, of course. As you aquire new staff, they'll need to be trained on your code base - and suddenly the "free" technology platform isn't appearing so cheap.

Before you shoot me down for a hyperbolic strawman argument, there is an important dichotomy to draw here, between innovation and execution. Applicable to the world of big data in general, it is a useful concept spelt out in the Oracle Information Management & Big Data Reference Architecture. For data to provide value, it doesn't have to land straight away into the world of formalised development processes, Production environments, and so on. A lot of the time you will want to 'poke around' with it, to explore it -- to innovate. Of the technology base out there, you may not know which tool, or library, is going to yield the best results. This is where the "discovery lab" comes in, and where the type of hand-cranked Spark coding that I've demonstrated sits:

Sometimes, work done in innovation is complete once it's done. As in, it has answered the required business question, and provided its value. But a lot of the time though it will simply establish and prove the process that is to be applied to the data, that then needs taking through to the execution layer. This is often called, in an abuse of the English language, "productionisation" or "industrialisation". This is where the questions of code maintenance and scalability need to be seriously considered. And this is where you need a scalable and maintainable approach to the design, management, and orchestration of your data processing - which is exactly what a tool like Oracle Data Integrator (ODI) provides.

ODI is the premier DI tool on the market, with good support for "big data" technologies, including the ability to generate Spark code to perform transformation. It can be deployed to run on Amazon's EMR, as illustrated here, as well as Oracle's Cloud platform. As can be seen from this presentation from Oracle Open World in September 2016, there's additional capabilities coming including around Kafka, Spark Streaming, and Cassandra.

Another route to examine, alongside ODI, is the ecosystem within AWS itself around code execution and orchestration with tools such as Lambda, Data Pipeline, and Simple Workflow Service. There's also AWS Glue, which like Athena was announced at re:Invent 2016. This promises three key things of crucial importance here:

  • A Data Catalog, populated automatically, and not only just supporting multiple formats and sources, but including automatic classification (e.g. "Web Log") of the data itself.
  • Automatic generation of ETL code. From the release announcement notes this looks like it is pySpark-based code. So the code that I put together for this exercise here, manually (and at times, painfully), could be automagically generated based on source/target and operators required. The announcement notes also specifically mention the inclusion of standard ETL processes such as handling bad data
  • Orchestration and management of ETL jobs. One of my main objections above to taking 'proof of concept' pySpark code and trying to use it in a Production scenario is that you end up with a spaghetti of scripts, which are a nightmare to maintain and support. If Glue lives up to its promises, we'd pretty much get the best of all worlds - a flexible yet robust platform.
Hadoop Ecosystems

A single vendor for your IT platform gives you "one throat to choke" when it comes to support, which is usually a good thing. But if that vendor's platform is closed and proprietary it makes leaving it, or even just making use of alternative tools with it, difficult. One of the evangelical claims made about the new world of open source software is that the proliferation of open standards would spell an end to vendor lockin. I was interested to see during the course of this exercise a few examples where the big vendors subtly pushed you towards their own tool of choice, or away from an alternative.

For example, Amazon EMR makes available Presto as part of the default build, but to run the latest version of Impala you'd have to install it yourself. Whilst it is possible to install it yourself, of course, this added friction makes it less likely that people will. This friction increases when we consider that the software usually needs installing - and configuring - across multiple the nodes of the Hadoop cluster. Given an open field of tools all purporting to do the same or similar things, any impedance to using one over the other will count. The same argument could be made for the CDH distribution, in which Impala is front and center, and deploying Presto or Drill would be a manual exercise. Again, yes, installing it may be relatively trivial - but manual download and deployment across a cluster is never going to win out over a one-click deploying from a centralised management console.

This is a long way from any kind of vendor lockin, but it is worth bearing in mind that walls, albeit thin ones, are being built around these various gardens in the Hadoop ecosystem.

Summary

I hope you've found this series of article useful. You can find a list of them below. In the meantime, please do get in touch if you'd like to find out more about how Rittman Mead can help you on your data and analytics journey!

Categories: BI & Warehousing

ETL Offload with Spark and Amazon EMR - Part 4 - Analysing the Data

Tue, 2016-12-20 03:00

We recently did a project we did for a client, exploring the benefits of Spark-based ETL processing running on Amazon's Elastic Map Reduce (EMR) Hadoop platform. The proof of concept we ran was on a very simple requirement, taking inbound files from a third party, joining to them to some reference data, and then making the result available for analysis.

The background to the project is here, I showed here how I built up the prototype PySpark code on my local machine, and then here how it could be run on Amazon's EMR hadoop platform automatically.

In this article I'm going to discuss the options for analysing the data and producing reports from it.

Squeegee Your Third Eye

Where do we store data for analysis? Databases, right? That's what we've always done. Whether Oracle, SQL Server, or even Redshift - we INSERT, UPDATE, and SELECT our data in a database, and all is well and happy with the world.

But ... what if you didn't need a database per se to query your data?

One of the things I wanted to explore during this project was the feasibility and response times that "SQL on Hadoop" engines could bring. Hive is probably the most well known of these, with other options including Apache Impala (incubating), Apache Drill, Presto, and even Oracle's Big Data SQL. All these tools read data that is not stored in a proprietory database format but stored in an open format, such a simple text file, on open storage platform such as HDFS. More commonly, formats (also open, non-proprietory) which are optimised for performance such as Parquet or ORC are used.

The advantage of these is that they provide multiple options for working with your data, starting from the same base storage place (usually HDFS, or S3). If one tool has benefits over another in a particular processing or analytics scenario we have the option to switch, without having to do anything to the actual data at rest itself. Contrast this to the implicit assumption that the data starts in an RDBMS (such as Oracle). With the data in a proprietary database the only options for switching tools are which ones you use to submit workload (over JDBC/ODBC/OCI etc). If another database platform is better in a given use case you end up either duplicating the data, or re-platforming the data entirely.

So whilst the flexibility of SQL-on-Hadoop is very appealing, there are limitations to it currently, in areas including performance and levels of ANSI SQL support.

Throughout this evaluation, my considerations were:

  • Performance. The client we were doing this project for performs both batch querying as well as ad-hoc analytics
  • Complexity, in two areas:
    • Configuration and optimisation : The more configuration and careful tending that a platform needs, the greater the overall cost. Oracle may have its license implications compared to open-source software, but how to operate it for optimal performance is well known and documented. It's also an extremely mature product, having solved many of the problems that newer technologies are only just starting to realise, let alone solve.
    • Load process: Whilst the SQL-on-Hadoop engines don't "load" the data, they sometimes require it to be laid out in a particular pattern of folders, or in a particular format for optimal performance.
  • Compatibility. JDBC or ODBC interfaces are needed to be able to use the tool with BI tools such as OBIEE or Oracle's Data Visualization Desktop. As well as the interface, the SQL language support needs to be sufficient for analytical queries.

For an overview of SQL-on-Hadoop engines see this presentation from Greg Rahn. It's a couple of years old but pretty much still current bar the odd version and feature change.

Redshift

Redshift is not SQL-on-hadoop - it is a full-blown database. Specifically, it is a proprietory implementation of Postgres by Amazon, running as a service on their cloud. Just as you can provision an EMR cluster of any required size, you can do the same for Redshift. As your capacity and processing requirements change, you can scale your Redshift cluster up and down.

Redshift has both JDBC and ODBC drivers, making it accessible from both Data Visualization Desktop (supported) and OBIEE (works, but not supported).

To work with Redshift you can use a tool just as SQL WorkbenchJ, or the psql commandline tool. I installed the latter on my Mac using brew and brew install postgres.

With the processed data held on S3, loading it into Redshift is as simple as defining the table (with standard CREATE TABLE DDL), and then issuing a COPY command:

COPY ACME FROM 's3://foobar-bucket/acme_enriched/'
CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXX;aws_secret_access_key=YYYYYYYYYYYY'
CSV
NULL AS 'null'
MAXERROR 100000
;

This takes any file under the given S3 path (and subfolders), parses it as a CSV, and loads it into the table. It presumes that the columns in the table are in the order that they are in the CSV file.

As a rough idea of load timings across several separate load jobs:

  • 2M rows in 10 minutes
  • 6M rows in 30 minutes
  • 23M rows in 1h18 minutes

Just like with the Spark coding, I didn't undertake any performance optimisations or 'good practices'. No sort keys or distribution configuration - just however it came by default, I used.

Out of the box, response times were pretty good - here's a sample of the queries. They're going across the same set of data (23M rows), stored on Redshift with no defined sort keys, distribution keys, etc - just however it comes out of the box with a vanilla CREATE TABLE DDL.

  • COUNT all records - 0.2 seconds

    dev=# select count(*) from acme3;
    count
    ----------
    23011785
    (1 row)
    
    
    Time: 225.053 ms
    
  • COUNT, GROUP BY - 2.0 seconds

    dev=# select country,site_category, count(*) from acme3 group by country,site_category;
    country | site_category  |  count
    ---------+----------------+---------
    GB      | Indirect       |  512519
    [...]
    (34 rows)
    
    
    Time: 2043.805 ms
    
  • COUNT, GROUP BY day (with DATE_TRUNC function) - 1.6 seconds

    dev=# select date_trunc('day',date_launched),country,count(*) from acme3 group by date_trunc('day',date_launched),country;
    date_trunc           | country | count
    ---------------------+---------+--------
    2016-01-01 00:00:00  | GB      |  24795
    [...]
    (412 rows)
    
    
    Time: 1625.590 ms
    
  • COUNT, GROUP BY week (with DATE_TRUNC function) - 3.8 seconds

    dev=# select date_trunc('week',date_launched),country,count(*) from acme3 group by date_trunc('week',date_launched),country;
    date_trunc           | country |  count
    ---------------------+---------+---------
    2016-01-18 00:00:00  | GB      | 1046417
    2016-01-25 00:00:00  | GB      |  945160
    2016-02-01 00:00:00  | GB      | 1204446
    2016-02-08 00:00:00  | GB      |  874694
    [...]
    (77 rows)
    
    
    Time: 3871.230 ms
    
  • COUNT, GROUP BY, WHERE, ORDER BY - 5.4 seconds

    dev=# select supplier, product, product_desc,count(*)  from acme3 where lower(product_desc) = 'beans' group by supplier,product,product_desc order by 4 desc limit 2;
    supplier             |      product    | product_desc | count
    ---------------------+-----------------+--------------+------
    ACME BEANS CO        | baked beans     | BEANS        |  2967
    BEANZ MEANZ          | beans + saus    | Beans        |  2347
    (2 rows)
    
    
    Time: 5427.035 ms
    
Hive (on Tez)

Hive enables you to run queries with SQL-like language (Hive QL) on data stored in various places including HDFS, and S3. It supports multiple formats of data, including simple delimited text files like CSV, and more advanced formats such as Parquet.

The version of Hive that I was using on EMR was automagically configured to use Tez as its execution engine, instead of the traditional map/reduce of the original Hadoop platform.

To query the data, simply define an EXTERNAL table. Why an EXTERNAL table? Well if you just define a TABLE, and then drop it ... it will also delete the underlying data. It's one of those syntax decisions that makes brutally logical sense, but burnt me and I'm sure has burnt many others. But, you won't do it again (or at least, not for a while).

CREATE EXTERNAL TABLE acme
(
product_desc STRING,
product STRING,
product_type STRING,
supplier STRING,
date_launched TIMESTAMP,
[...]
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://foobar-bucket/acme_enriched';

With the table defined you can test that it works using the LIMIT clause to only pull back some of the records:

hive> select product_desc,supplier,country from acme3 limit 5;
OK
Baked Beans       BEANZ MEANZ  GB
Tinned Tom      VEG CORP      GB
Tin Foil  FOIL SOLN   GB
Shreddies    CRUNCHYCRL     GB
Lemonade  FIZZ POP        GB

Whilst Hive technically enables you to query your data, the response times are so high that it's not even really a candidate for batch reporting.

Here's a couple of examples against a very small set of data, held in CSV format:

hive> select count(*) from acme;
21216
Time taken: 58.063 seconds, Fetched: 1 row(s)

hive> select country,count(*) from acme group by country;
US      21216
Time taken: 50.646 seconds, Fetched: 1 row(s)

Tez helpfully provides a progress report of queries, such as this one here - a simple count of all rows, on a much larger dataset (25M rows, CSV files). After seven minutes, I gave up - with 3% of the query complete

hive> select count(*) from acme3;
Query ID = hadoop_20161019112007_cca7d37f-c5af-47f2-9d7d-3187342fbbb3
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.


Status: Running (Executing on YARN cluster with App id application_1476873946157_0002)

----------------------------------------------------------------------------------------------
VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1            container       RUNNING  107601       3740        5   103856       0       6
Reducer 2        container       RUNNING      1          0        1        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 00/02  [>>--------------------------] 3%    ELAPSED TIME: 442.36 s
----------------------------------------------------------------------------------------------

As with elsewhere in the exercise - I'm well aware that there are optimisations that could be made that could help with response time, such as storing the data in more optimal formats (ORC/Parquet) and layouts (partitioning) as well as compresing it.

Don't write Hive off though - the latest versions (being developed by Hortonworks, and not on EMR yet) are moving to use in-memory components and competing well against Impala.

Impala

Impala is Cloudera's open-source offering in the SQL-on-Hadoop space. I was hoping to try out Impala against the data in S3, especially given a recent post by Cloudera with some promising performance metrics. Unfortunately this was for Impala 2.6, and the only version available prebuilt on EMR was 1.2.4. Given time, it would have been possible to build my own CDH-based Hadoop cluster (using Director to automate it) with the latest version of Impala installed - but this will have to be for another day. The current Cloudera documentation also suggests that S3 is:

[...]more suitable for holding 'cold' data that is only queried occasionally

although it's not clear if that's still true in the context of the latest Impala S3 optimisations.

Presto

Presto is an open source project that originated at Facebook. Similar to Apache Drill (below), it can query across data (and federate the results) from multiple sources including Hive (and thus S3), MongoDB, MySQL, and even Kafka.

For Presto to query against the data in S3, you need to define the table in Hive first. Presto uses the Hive metastore to retrieve the definition of the table, and carries out the actual query execution itself. First, a simple smoke test that we can pull back some data:

$ presto-cli --catalog hive --schema default

presto:default> select product_desc,supplier,country from acme limit 5;
product_desc | supplier | country
-------+-------+---------
(0 rows)

Query 20161019_110919_00004_ev4hz, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

No data. Hmm. The exact same query against the same Hive table does return data. Turns out that Presto, by default, won't recursively query subfolders, whilst Hive, by default, does. After amending /etc/presto/conf.dist/catalog/hive.properties to set hive.recursive-directories=true and restarting Presto (sudo restart presto-server) on each EMR node, I then got data back:

presto:default> select count(*) from acme_latam;
_col0
---------
1993955
(1 row)

Query 20161019_200312_00003_xa352, FINISHED, 2 nodes
Splits: 44 total, 44 done (100.00%)
0:10 [1.99M rows, 642MB] [203K rows/s, 65.3MB/s]

This was against a 1.9M row dataset, held in CSV format. Run again soon after, and the response time was 3 seconds.

Querying a bigger set of data was slower - 4 minutes for 23M rows of data:

presto:default> select count(*) from acme3;
_col0
----------
23105948
(1 row)

Query 20161019_200815_00006_xa352, FINISHED, 2 nodes
Splits: 52,970 total, 52,970 done (100.00%)
4:04 [23.1M rows, 9.1GB] [94.8K rows/s, 38.2MB/s]

Same timing for doing a GROUP by on the data too:

presto:default> select country,site_category, count(*) from acme3 group by country,site_category;
country |  site_category     |  _col2
--------+--------------------+----------
GB      | Price comparison   |     146
GB      | DIRECT RETAIL      |   10903
[...]
Query 20161019_201443_00013_xa352, FINISHED, 2 nodes
Splits: 52,972 total, 52,972 done (100.00%)
4:54 [23.1M rows, 9.11GB] [78.6K rows/s, 31.7MB/s]

Presto includes a swanky web interface for seeing the status and execution of queries

Loading to ORC

Taking a brief detour into one of the most common recommendations for performance with Presto - storing data in ORC format.

First, in Hive, I created an ORC-stored table:

CREATE EXTERNAL TABLE acme_orc
(
product_desc STRING,
product STRING,
product_type STRING,
supplier STRING,
date_launched TIMESTAMP,
[...]
)
STORED AS ORC
LOCATION 's3://foobar-bucket/acme_orc/';

and then loaded a small sample of data:

hive> insert into acme_orc select * from acme_tst;

Querying it in Presto:

presto:default> select count(*) from acme_orc;
_col0
-------
29927
(1 row)

Query 20161019_113802_00008_ev4hz, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
0:01 [29.9K rows, 1.71MB] [34.8K rows/s, 1.99MB/s]

With small volumes this was fine - 90 seconds to load 30k rows into an ORC-stored table, and a second to then query that from Presto with a count across all rows.

Loading 1.9M rows into an ORC-stored table took 30 minutes, and didn't actually (on the surface) speed things up. Caveat: this was a first pass at optimisation; there'll be a dozen settings and approaches to try out before any valid conclusions can be drawn from it:

  • COUNT GROUP BY from 1.9M rows, ORC - 3 seconds

    presto:default> select country,site_category, count(*) from acme_latam_orc group by country,site_category;
    country | site_category |  _col2
    ---------+---------------+---------
    LATAM   | null          | 1993955
    (1 row)
    
    
    Query 20161019_202206_00017_xa352, FINISHED, 2 nodes
    Splits: 46 total, 46 done (100.00%)
    0:03 [1.99M rows, 76.2MB] [790K rows/s, 30.2MB/s]
    
  • COUNT over 1.9M rows, CSV - 3 seconds

    presto:default> select country,site_category, count(*) from acme_latam group by country,site_category;
    country | site_category |  _col2
    ---------+---------------+---------
    LATAM   | null          | 1993955
    (1 row)
    
    
    Query 20161019_202241_00018_xa352, FINISHED, 2 nodes
    Splits: 46 total, 46 done (100.00%)
    0:03 [1.99M rows, 642MB] [575K rows/s, 185MB/s]
    
  • Function and filter, 1.9M rows, ORC

    presto:default> select count(*) from acme_latam_orc where lower(product_desc) = 'eminem';
    _col0
    -------
    2107
    (1 row)
    
    
    Query 20161019_202558_00019_xa352, FINISHED, 2 nodes
    Splits: 44 total, 44 done (100.00%)
    0:04 [1.99M rows, 76.2MB] [494K rows/s, 18.9MB/s]
    
  • Function and filter, 1.9M rows, CSV

    presto:default> select count(*) from acme_latam where lower(product_desc) = 'eminem';
    _col0
    -------
    2107
    (1 row)
    
    
    Query 20161019_202649_00020_xa352, FINISHED, 2 nodes
    Splits: 44 total, 44 done (100.00%)
    0:03 [1.99M rows, 642MB] [610K rows/s, 196MB/s]
    

Trying to load greater volumes (23M rows) to ORC was unsuccessful, due to memory issues with the Hive execution.

Status: Running (Executing on YARN cluster with App id application_1476905618527_0004)

----------------------------------------------------------------------------------------------
VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------
VERTICES: 00/00  [>>--------------------------] 0%    ELAPSED TIME: 13956.69 s
----------------------------------------------------------------------------------------------
Status: Failed
Application application_1476905618527_0004 failed 2 times due to AM Container for appattempt_1476905618527_0004_000002 exited with  exitCode: 255

In the Tez execution log was the error

Diagnostics: Container [pid=4642,containerID=container_1476905618527_0004_01_000001] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 2.7 GB of 5 GB virtual memory used. Killing container.

With appropriate investigation (and/or smaller chunks of data loading) this could obviously be overcome, but for now halted any further investigation into ORC's usefulness. The other major area to investigate would be partitioning of the data.

A final note on performance - this blog does a comparison of Presto querying data held in S3 vs HDFS on EMR. HDFS on EMR is quicker, generally about 1.5 times or so - but you of course need your data loaded into HDFS on a running EMR cluster, whereas S3 is there on demand whenever you want it.

Drill

Apache Drill is another open-source tool, similar in concept to Presto, in that it enables querying across data held in multiple sources. I've written about it previously here and here. Whilst EMR has an option to provision a Drill cluster as part of an EMR build, it didn't seem to work when I tried it - and with Presto running I didn't spend the time digging into Drill. Given another time and project though, I'd definitely be looking to run it against this kind of data to see how it handled it. A recent thread on the Drill mailing list gave some interesting information on performance.

Athena

Amazon's Athena tool was announced at re:Invent 2016. Even though it was made GA after the client project being discussed here and therefore not evaluated, it is definitely worth mentioning. It provides "serverless" SQL querying of data held in S3. Under the covers it uses Presto (which is one of the tools I evaluated above). The benefit of Athena is that you wouldn't need to provision and configure actual servers to use Presto. You work with it through the web-based interface, or JDBC. This is a pretty big point to make - you can query your data, held in an open format, on demand, using SQL, without having to move it into a database or build a server to run a query engine.

Athena looks interesting, but one of the main things that struck me about it was that it is not something you would simply point at piles of data on S3 and build your analytics systems on. The cost is per query, and is currently $5 per TB scanned. FIVE DOLLARS, per terabyte of data SCANNED. Not retrieved. Scanned. So in order to not run up big AWS bills if you've got lots of data, you're going to need to do smart things to reduce the size of data scanned. Partitioning your data, and compressing it, will both help. As it happens, these are the things that are going to increase performance too, so it's not wasted effort. There's a good writeup here demonstrating Athena, and the difference that using an appropriate storage format for the data makes to performance and volumes of data scanned (and thus cost).

The cost consideration is a crucial point, because it means that data 'engineering' is still needed in any system you plan to build Athena on top of as the query engine. Sure, you can use it for adhoc 'fishing' expeditions in your 'data lake' (sorry....). Here the benefit of sifting through vast and disparate data without having to transform and/or load it into a queryable form first will probably outweigh the ad-hoc costs (remember : $5 per TB scanned). But as I said, if you're engineering Athena into your system as the SQL engine of top of data at rest in S3, you'll want to invest in the necessary wrangling in order to store the data (a) partitioned and (b) compressed.

You can read more about Athena here.

Front End Tools

All of the exploration so far has been from the commandline, but users want their data visually. The client for whom we were doing this work currently use OBIEE and BI Publisher to deliver the data. Both Redshift and Presto have JDBC and ODBC drivers, which means that they should work with OBIEE (although neither are on the supported databases list). Oracle's Data Visualization Desktop tool is also of interest here, bringing with it native support for both Redshift and Presto (beta).

A tool that we didn't examine, but is directly relevant given the Amazon context, is Quicksight. Currently in closed-preview Released in mid-November 2017, this is a cloud-based tool that enables querying of data in many sources including Redshift -- but also S3 itself.

Summary

For interactive analysis, Redshift performed well straight off. However, with some of the in-memory capabilities of the SQL-on-Hadoop engines, and the appeal of simply provisioning compute to query data held on S3 when required, it would be interesting to spend some time digging into the recommended optimisations and design patterns to see just how fast the querying could be.

Since all the query engines considered support JDBC, and any respectable front-end tool can query JDBC, we're not constrained in the choice of one by the other. Hooray for open standards enabling optimal choice and pairing of technologies! I liked using Oracle DV Desktop as it's a simple install and quick to get visualisations out of. Ultimately the choice of tool would come down to factors including complexity of requirements, scale of deployment - and of course, cost.

In the final article in this series we'll take a recap over the whole project, and look at some of the broader points of interest to draw from it.

Categories: BI & Warehousing

ETL Offload with Spark and Amazon EMR - Part 3 - Running pySpark on EMR

Mon, 2016-12-19 03:00

In the previous articles (here, and here) I gave the background to a project we did for a client, exploring the benefits of Spark-based ETL processing running on Amazon's Elastic Map Reduce (EMR) Hadoop platform. The proof of concept we ran was on a very simple requirement, taking inbound files from a third party, joining to them to some reference data, and then making the result available for analysis.

I showed here how I built up the prototype PySpark code on my local machine, using Docker to quickly and easily make available the full development environment needed.

Now it's time to get it running on a proper Hadoop platform. Since the client we were working with already have a big presence on Amazon Web Services (AWS), using Amazon's Hadoop platform made sense. Amazon's Elastic Map Reduce, commonly known as EMR, is a fully configured Hadoop cluster. You can specify the size of the cluster and vary it as you want (hence, "Elastic"). One of the very powerful features of it is that being a cloud service, you can provision it on demand, run your workload, and then shut it down. Instead of having a rack of physical servers running your Hadoop platform, you can instead spin up EMR whenever you want to do some processing - to a size appropriate to the processing required - and only pay for the processing time that you need.

Moving my locally-developed PySpark code to run on EMR should be easy, since they're both running Spark. Should be easy, right? Well, this is where it gets - as we say in the trade - "interesting". Part of my challenges were down to the learning curve in being new to this set of technology. However, others I would point to more as being examples of where the brave new world of Big Data tooling becomes less an exercise in exciting endless possibilities and more stubbornly Googling errors due to JAR clashes and software version mismatches...

Provisioning EMR

Whilst it's possible to make the entire execution of the PySpark job automated (including the provisioning of the EMR cluster itself), to start with I wanted to run it manually to check each step along the way.

To create an EMR cluster simply login to the EMR console and click Create

I used Amazon's EMR distribution, configured for Spark. You can also deploy a MapR-based hadoop platform, and use the Advanced tab to pick and mix the applications to deploy (such as Spark, Presto, etc).

The number and size of the nodes is configured here (I used the default, 3 machines of m3.xlarge spec), as is the SSH key. The latter is very important to get right, otherwise you won't be able to connect to your cluster over SSH.

Once you click Create cluster Amazon automagically provisions the underlying EC2 servers, and deploys and configures the software and Hadoop clustering across them. Anyone who's set up a Hadoop cluster will know that literally a one-click deploy of a cluster is a big deal!

If you're going to be connecting to the EMR cluster from your local machine you'll want to modify the security group assigned to it once provisioned and enable access to the necessary ports (e.g. for SSH) from your local IP.

Deploying the code

I developed the ETL code in Jupyter Notebooks, from where it's possible to export it to a variety of formats - including .py Python script. All the comment blocks from the Notebook are carried across as inline code comments.

To transfer the Python code to the EMR cluster master node I initially used scp, simply out of habit. But, a much more appropriate solution soon presented itself - S3! Not only is this a handy way of moving data around, but it comes into its own when we look at automating the EMR execution later on.

To upload a file to S3 you can use the S3 web interface, or a tool such as Cyberduck. Better, if you like the command line as I do, is the AWS CLI tools. Once installed, you can run this from your local machine:

aws s3 cp Acme.py s3://foobar-bucket/code/Acme.py

You'll see that the syntax is pretty much the same as the Linux cp comand, specifying source and then destination. You can do a vast amount of AWS work from this command line tool - including provisioning EMR clusters, as we'll see shortly.

So with the code up on S3, I then SSH'd to the EMR master node (as the hadoop user, not ec2-user), and transfered it locally. One of the nice things about EMR is that it comes with your AWS security automagically configred. Whereas on my local machine I need to configure my AWS credentials in order to use any of the aws commands, on EMR the credentials are there already.

aws s3 cp s3://foobar-bucket/code/Acme.py ~

This copied the Python code down into the home folder of the hadoop user.

Running the code - manually

To invoke the code, simply run:

spark-submit Acme.py

A very useful thing to use, if you aren't already, is GNU screen (or tmux, if that's your thing). GNU screen is installed by default on EMR (as it is on many modern Linux distros nowadays). Screen does lots of cool things, but of particular relevance here is it lets you close your SSH connection whilst keeping your session on the server open and running. You can then reconnect at a later time back to it, and pick up where you left off. Whilst you're disconnected, your session is still running and the work still being processed.

From the Spark console you can monitor the execution of the job running, as well as digging into the details of how it undertakes the work. See the EMR cluster home page on AWS for the Spark console URL

Problems encountered

I've worked in IT for 15 years now (gasp). Never has the phrase "The devil's in the detail" been more applicable than in the fast-moving world of big data tools. It's not suprising really given the staggering rate at which code is released that sometimes it's a bit quirky, or lacking what may be thought of as basic functionality (often in areas such as security). Each of these individual points could, I suppose, be explained away with a bit of RTFM - but the nett effect is that what on paper sounds simple took the best part of half a day and a LOT of Googling to resolve.

Bear in mind, this is code that ran just fine previously on my local development environment.

When using SigV4, you must specify a 'host' parameter
boto.s3.connection.HostRequiredError: BotoClientError: When using SigV4, you must specify a 'host' parameter.

To fix, switch

conn_s3 = boto.connect_s3()  

for

conn_s3 = boto.connect_s3(host='s3.amazonaws.com')  

You can see a list of endpoints here.

boto.exception.S3ResponseError: S3ResponseError: 400 Bad Request

Make sure you're specifying the correct hostname (see above) for the bucket's region. Determine the bucket's region from the S3 control panel, and then use the endpoint listed here.

Error: Partition column not found in schema

Strike this one off as bad programming on my part; in the step to write the processed file back to S3, I had partitionBy='', in the save function

duplicates_df.coalesce(1).write.save(full_uri,
                                     format='com.databricks.spark.csv',
                                     header='false',
                                     partitionBy='',
                                     mode='overwrite')

This, along with the coalesce (which combined all the partitions down to a single one) were wrong, and fixed by changing to:

duplicates_df.write.save(full_uri,
                         format='com.databricks.spark.csv',
                         header='false',
                         mode='overwrite')
Exception: Python in worker has different version 2.6 than that in driver 2.7, PySpark cannot run with different minor versions

To get the code to work on my local Docker/Jupyter development environment, I set an environment variable as part of the Python code to specify the Python executable:

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'

I removed this (along with all the PYSPARK_SUBMIT_ARGS) and the code then ran fine.

Timestamp woes

In my original pySpark code I was letting it infer the schema from the source, which included it determining (correctly) that one of the columns was a timestamp. When it wrote the resulting processed file, it wrote the timestamp in a standard format (YYYY-MM-DD HH24:MI:SS). Redshift (of which more in the next article) was quite happy to process this as a timestamp, because it was one.
Once I moved the pySpark code to EMR, the Spark engine moved from my local 1.6 version to 2.0.0 - and the behaviour of the CSV writer changed. Instead of the format before, it switched to writing the timestamp in epoch form, and not just that but microseconds since epoch. Whilst Redshift could cope with epoch seconds, or milliseconds, it doesn't support microseconds, and the load job failed

Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS]

and then

Fails: Epoch time copy out of acceptable range of [-62167219200000, 253402300799999]

Whilst I did RTFM, it turns out that I read the wrong FM, taking the latest (2.0.1) instead of the version that EMR was running (2.0.0). And whilst 2.0.1 includes support for specifying the output timestampFormat, 2.0.0 doesn't.

In the end I changed the Spark job to not infer the schema, and so treat the timestamp as a string, thus writing it out in the same format. This was a successful workaround here, but if I'd needed to do some timestamp-based processing in the Spark job I'd have had to find another option.

Success!

I now had the ETL job running on Spark on EMR, processing multiple files in turn. Timings were approximately five minutes to process five files, half a million rows in total.

One important point to bear in mind through all of this is that I've gone with default settings throughout, and not made any effort to optimise the PySpark code. At this stage, it's simply proving the end-to-end process.

Automating the ETL

Having seen that the Spark job would run successfully manually, I now went to automate it. It's actually very simple to do. When you launch an EMR cluster, or indeed even if it's running, you can add a Step, such as a Spark job. You can also configure EMR to terminate itself once the step is complete.

From the EMR cluster create screen, switch to Advanced. Here you can specify exactly which applications you want deployed - and what steps to run. Remember how we copied the Acme.py code to S3 earlier? Now's when it comes in handy! We simply point EMR at the S3 path and it will run that code for us - no need to do anything else. Once the code's finished executing, the EMR cluster will terminate itself.

After testing out this approach successfully, I took it one step further - command line invocation. AWS make this ridiculously easier, because from the home page of any EMR cluster (running or not) there is a button to click which gives you the full command to run to spin up another cluster with the exact same configuration

This gives us a command like this:

    aws emr create-cluster \
    --termination-protected \
    --applications Name=Hadoop Name=Spark Name=ZooKeeper \
    --tags 'owner=Robin Moffatt' \
    --ec2-attributes '{"KeyName":"Test-Environment","InstanceProfile":"EMR_EC2_DefaultRole","AvailabilityZone":"us-east-1b","EmrManagedSlaveSecurityGroup":"sg-1eccd074","EmrManagedMasterSecurityGroup":"sg-d7cdd1bd"}' \
    --service-role EMR_DefaultRole \
    --enable-debugging \
    --release-label emr-5.0.0 \
    --log-uri 's3n://aws-logs-xxxxxxxxxx-us-east-1/elasticmapreduce/' \
    --steps '[{"Args":["spark-submit","--deploy-mode","cluster","s3://foobar-bucket/code/Acme.py"],"Type":"CUSTOM_JAR","ActionOnFailure":"TERMINATE_CLUSTER","Jar":"command-runner.jar","Properties":"","Name":"Acme"}]' \
    --name 'Rittman Mead Acme PoC' \
    --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master instance group - 1"},{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core instance group - 2"}]' \
    --region us-east-1 \
    --auto-terminate

This spins up an EMR cluster, runs the Spark job and waits for it to complete, and then terminates the cluster. Logs written by the Spark job get copied to S3, so that even once the cluster has been shutdown, the logs can still be accessed. Seperation of compute from storage - it makes a lot of sense. What's the point having a bunch of idle CPUs sat around just so that I can view the logs at some point if I want to?

The next logical step for this automation would be the automatic invocation of above process based on the presence of a defined number of files in the S3 bucket. Tools such as Lambda, Data Pipeline, and Simple Workflow Service are all ones that can help with this, and the broader management of ETL and data processing on AWS.

Spot Pricing

You can save money further with AWS by using Spot Pricing for EMR requests. Spot Pricing is used on Amazon's EC2 platform (on which EMR runs) as a way of utilising spare capacity. Instead of paying a fixed (higher) rate for some server time, you instead 'bid' at a (lower) rate and when the demand for capacity drops such that the spot price does too and your bid price is met, you get your turn on the hardware. If the spot price goes up again - your server gets killed.

Why spot pricing makes sense on EMR particularly is that Hadoop is designed to be fault-tolerant across distributed nodes. Whilst pulling the plug on an old-school database may end in tears, dropping a node from a Hadoop cluster may simply mean a delay in the processing whilst the particular piece of (distributed) work is restarted on another node.

Summary

We've developed out simple ETL application, and got it running on Amazon's EMR platform. Whilst we used AWS because it's the client's platform of choice, in general there's no reason we couldn't take it and run it on another Hadoop platform. This could be a Hadoop platform such as Oracle's Big Data Cloud Service, Cloudera's CDH running on Oracle's Big Data Appliance, or simply a self-managed Hadoop cluster on commodity hardware.

Processing time was in the region of 30 minutes to process 2M rows across 30 files, and in a separate batch run 3.8 hours to process 283 files of around 25M rows in total.

So far, the data that we've processed is only sat in a S3 bucket up in the cloud.

In the next article we'll look at what the options are for actually analysing the data and running reports against it.

Categories: BI & Warehousing

ETL Offload with Spark and Amazon EMR - Part 2 - Code development with Notebooks and Docker

Fri, 2016-12-16 03:00

In the previous article I gave the background to a project we did for a client, exploring the benefits of Spark-based ETL processing running on Amazon's Elastic Map Reduce (EMR) Hadoop platform. The proof of concept we ran was on a very simple requirement, taking inbound files from a third party, joining to them to some reference data, and then making the result available for analysis. The primary focus was proving the end-to-end concept, with future iterations focussing on performance and design optimisations.

Here we'll see how I went about building up the ETL process.

Processing Overview

The processing needed to iterate over a set of files in S3, and for each one:

  • Loads the file from S3
  • Determines region from filename, and adds as column to data
  • Deduplicates it
  • Writes duplicates to separate file
  • Loads sites reference data
  • Extracts domain from URL string
  • Joins facts with sites on domain
  • Writes resulting file to S3

Once the data is processed and landed back to S3, we can run analytics on it. See subsequent articles for discussion of Redshift vs in-place querying with tools such as Presto.

Ticking All The Cool-Kid Boxes - Spark AND Notebooks AND Docker!

Whilst others in Rittman Mead have done lots of work with Spark, I myself was new to it, and needed a sandpit in which I could flail around without causing any real trouble. Thus I set up a nice self-contained development environment on my local machine, using Docker to provision and host it, and Jupyter Notebooks as the interface.

Notebooks

In a world in which it seems that there are a dozen cool new tools released every day, Interactive Notebooks are for me one of the most significant of recent times for any developer. They originate in the world of data science, where taking the 'science' bit at its word, data processing and exploration is written in a self-documenting manner. It makes it possible to follow how and why code was written, what the output at each stage was -- and to run it yourself too. By breaking code into chunks it makes it much easier to develop as well, since you can rerun and perfect each piece before moving on.

Notebooks are portable, meaning that you can work with them in one system, and then upload them for others to learn from and even import to run on their own systems. I've shared a simplified version of the notebook that I developed for this project on gist here, and you can see an embedded version of it at the end of this article.

The two most common are Apache Zeppelin, and Jupyter Notebooks (previously known as iPython Notebooks). Jupyter is the one I've used previously, and stuck with again here. To read more about notebooks and see them in action, see my previous blog posts here and here.

Docker

Plenty's been written about Docker. In a nutshell, it is a way to provision and host a set of self-contained software. It takes the idea of a virtual machine (such as VMWare, or VirtualBox), but without having to install an OS, and then the software, and then configure it all yourself. You simply take a "Dockerfile" that someone has prepared, and run it. You can create copies, or throwaway and start again, from a single command. I ran Docker on my Mac through Kitematic, and natively on my home server.

There are prebuilt Docker configuration files for lots of software (including Oracle and OBIEE!), and I found one that includes Spark, PySpark, and Jupyter - perfect!

To launch it, you simply enter:

docker run -d -p 18888:8888 jupyter/all-spark-notebook

This downloads all the necessary Docker files etc - you don't need anything local first, except Docker.

I ran it with an additional flag, -v, configuring it to use a folder on my local machine to store the work that I created. By default all files reside within the Docker image itself - and get deleted when you delete the Docker instance.

docker run -d -p 18888:8888 -v /Users/rmoff/all-spark-notebook:/home/jovyan/work jupyter/all-spark-notebook

You can also run the container with an additional flag, GRANT_SUDO, so that the guest user can run sudo commands within it. To do this include -e GRANT_SUDO=yes --user root:

docker run -d -p 18888:8888 -e GRANT_SUDO=yes --user root -v /Users/rmoff/all-spark-notebook:/home/jovyan/work jupyter/all-spark-notebook

With the docker container running, you can access Jupyter notebooks on the port exposed in the command used to launch it (18888)

Getting Started with Jupyter

From Jupyter's main page you can see the files within the main folder (see above for how you can map this to a local folder on the machine hosting Docker). Using the New menu in the top-right you can create:

  • Folders and edit Text files
  • A terminal
  • A notebook, running under one of several different 'Kernels' (host interpreters and environments)

The ability to run a terminal from Jupyter is very handy - particularly on Docker. Docker by its nature isn't really designed for interaction within the container itself. It's the point of Docker in a way, that it provisions and configures all the software for you. You can use Docker to run a bash shell directly, but it's more fiddly than just using the Jupyer Terminal.

I used a Python 2 notebook for my work; with this Docker image you also have the option of Python 3, Scala, and R.

Developing the Spark job

With my development environment up and running, I set to writing the Spark job. Because I'm already familiar with Python I took advantage of PySpark. Below I describe the steps in the processing and how I achieved them.

Environment Preparation

Define AWS parameters:

access_key='XXXXXXXXXXXXXXXXXX  
secret='YYYYYYYYYYYYYYYYY'  
bucket_name='foobar-bucket'  

Set up the PySpark environment, including necessary JAR files for accessing S3 from Spark:

import os  
os.environ['AWS_ACCESS_KEY_ID'] = access_key  
os.environ['AWS_SECRET_ACCESS_KEY'] = secret  
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk-pom:1.10.34,com.databricks:spark-csv_2.11:1.3.0 pyspark-shell'  

Create a spark context:

import pyspark  
sc = pyspark.SparkContext('local[*]')  
sqlContext = pyspark.SQLContext(sc)  

Import Python libraries

from pyspark.sql.functions import udf  
from pyspark.sql.functions import lit  
import boto  
from urlparse import urlsplit  

Note that to install python libraries not present on the Docker image (such as boto, which is used for accessing AWS functionality from within Python) you can run from a Jupyter Terminal:

/opt/conda/envs/python2/bin/pip install boto

On other platforms the path to pip will vary, but the install command is the same

Loading Data from S3

The source data comes from an S3 "bucket", on which there can be multiple "keys". Buckets and keys roughly translate to "disk drive" and "file".

We use the boto library to interact with S3 to find a list of files ('keys') matching the pattern of source files that we want to process.

Connect to the bucket
conn_s3 = boto.connect_s3()  
bucket = conn_s3.get_bucket(bucket_name)  
Iterate over the bucket contents

This bit would drive iterative processing over multiple input files; for now it just picks the last file on the list (acme_file getting set on each iteration and so remaining set after the loop)

contents=bucket.list(prefix='source_files/')  
for f in contents:  
    print f.name
    print f.size
    acme_file = f.name
print "\n\n--\nFile to process: %s" % acme_file  
Read the CSV from S3 into Spark dataframe

The Docker image I was using was running Spark 1.6, so I was using the Databricks CSV reader; in Spark 2 this is now available natively. The CSV file is loaded into a Spark data frame. Note that Spark is reading the CSV file directly from a S3 path.

full_uri = "s3n://{}/{}".format(bucket_name, acme_file)  
print full_uri  
s3n://foobar-bucket/source_files/acme_GB_20160803_100000.csv
acme_df = sqlContext.read.load(full_uri,  
                                  format='com.databricks.spark.csv',
                                  header='true',
                                  inferSchema='true')

acme_df.printSchema()  
root
 |-- product: string (nullable = true)
 |-- product_desc: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- date_launched: timestamp (nullable = true)
 |-- position: string (nullable = true)
 |-- url: string (nullable = true)
 |-- status: string (nullable = true)
 |-- reject_reason: string (nullable = true)

The above shows the schema of the dataframe; Spark has infered this automagically from the column headers (for the column names), and then the data types within (note that it has correctly detected a timestamp in the date_launched column)

Add country column to data frame

The filename of the source data includes a country field as part of it. Here we use this regular expression to extract it:

filename=os.path.split(acme_file)[1]  
import re  
m=re.search('acme_([^_]+)_.+$', filename)  
if m is None:  
    country='NA'
else:  
    country=m.group(1)

print "Country determined from filename '%s' as : %s" % (filename,country)  
Country determined from filename 'acme_GB_20160803_100000.csv' as : GB

With the country stored in a variable, we add it as a column to the data frame:

Note that the withColumn function requires a Column value, which we create here using the PySpark lit function that was imported earlier on.

acme_df=acme_df.withColumn('country',lit(country))

acme_df.printSchema()  
root
 |-- product: string (nullable = true)
 |-- product_desc: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- date_launched: timestamp (nullable = true)
 |-- position: string (nullable = true)
 |-- url: string (nullable = true)
 |-- status: string (nullable = true)
 |-- reject_reason: string (nullable = true)
 |-- country: string (nullable = false)

Note the new column added to the end of the schema.

Deduplication

Now that we've imported the file, we need to deduplicate it to remove entries with the same value for the url field. Here I'm created a second dataframe based on a deduplication of the first, using the PySpark native function dropDuplicates:

acme_deduped_df = acme_df.dropDuplicates(['url'])  

For informational purposes we can see how many records are in the two dataframes, and determine how many duplicates there were:

orig_count = acme_df.count()  
deduped_count = acme_deduped_df.count()  
print "Original count: %d\nDeduplicated count: %d\n\n" % (orig_count,deduped_count)  
print "Number of removed duplicate records: %d" % (orig_count - deduped_count)  
Original count: 97974
Deduplicated count: 96706


Number of removed duplicate records: 1268
Deriving Domain from URL

One of the sets of reference data is information about the site on which the product was viewed. To bring these sets of attributes into the main dataset we join on the domain itself. To perform this join we need to derive the domain from the URL. We can do this using the python urlsplit library, as seen in this example:

sample_url = 'https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/'

print sample_url  
print urlsplit(sample_url).netloc  
https://www.rittmanmead.com/blog/2016/08/using-apache-drill-with-obiee-12c/
www.rittmanmead.com

We saw above that to add a column to the dataframe the withColumn function can be used. However, to add a column that's based on another (rather than a literal, which is what the country column added above was) we need to use the udf function. This generates the necessary Column field based on the urlsplit output for the associated url value.

First we define our own function which simply applies urlsplit to the value passed to it

def getDomain(value):  
    return urlsplit(value).netloc

and then a UDF based on it:

udfgetDomain = udf(getDomain)  

Finally, apply this to a third version of the dataframe:

acme_deduped_df_with_netloc = acme_deduped_df.withColumn("netloc", udfgetDomain(acme_deduped_df.url))  
Joining to Second Dataset

Having preparing the primary dataset, we'll now join it to the reference data. The source of this is currently an Oracle database. For simplicity we're working with a CSV dump of the data, but PySpark supports the option to connect to sources with JDBC so we could query it directly if required.

First we import the sites reference data CSV:

sites_file = "s3n://{}/{}".format('foobar-bucket', 'sites.csv')  
sites_df = sqlContext.read.load(sites_file,  
                                  format='com.databricks.spark.csv',
                                  header='true',
                                  inferSchema='true')

Then some light data cleansing with the filter function to remove blank SITE entries, and blank SITE_RETAIL_TYPE entries:

sites_pruned_df = sites_df.filter("NOT (SITE ='' OR SITE_RETAIL_TYPE = '')")  

Now we can do the join itself. This joins the original dataset (acme_deduped_df_with_netloc) with the sites reference data (sites_pruned_df), using a left outer join.

merged_df = acme_deduped_df_with_netloc.join(sites_pruned_df,acme_deduped_df_with_netloc.netloc == sites_pruned_df.SITE, 'left_outer')  

Using the filter function and show we can inspect the dataset for matches, and misses:

First 10 matched:

merged_df.filter(merged_df.ID.isNotNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)  

First 10 unmatched:

merged_df.filter(merged_df.ID.isNull()).select('date_launched','url','netloc','ID','SITE','SITE_RETAIL_TYPE').show(10)  
Write Back to S3

The finished dataset is written back to S3. As before, we're using the databricks CSV writer here but in Spark 2 would be doing it natively:

acme_enriched_filename='acme_enriched/%s' % filename.replace('.csv','')  
full_uri = "s3n://{}/{}".format(bucket_name, acme_enriched_filename)  
print 'Writing enriched acme data to %s' % full_uri  
merged_df.write.save(path=full_uri,  
                     format='com.databricks.spark.csv',
                     header='false',
                     nullValue='null',
                     mode='overwrite')
Summary

With the above code written, I could process input files in a couple of minutes per 30MB file. Bear in mind two important constraints to this performance:

  1. I was working with data residing up in the Amazon Cloud, with the associated network delay in transferring to and from it

  2. The processing was taking place on a single node Spark deployment (on my laptop, under virtualisation), rather than the multiple-node configuration typically seen.

The next steps, as we'll see in the next article, were to port this code up to Amazon Elastic Map Reduce (EMR). Stay tuned!

Footnote: Notebooks FTW!

(FTW)

Whilst I've taken the code and written it out above more in the form of a blog post, I could have actually just posted the Notebook itself, and it wouldn't have needed much more explanation. Here it is, along with some bonus bits on using S3 from python:

Categories: BI & Warehousing

ETL Offload with Spark and Amazon EMR - Part 1

Thu, 2016-12-15 03:00

We recently undertook a two-week Proof of Concept exercise for a client, evaluating whether their existing ETL processing could be done faster and more cheaply using Spark. They were also interested in whether something like Redshift would provide a suitable data warehouse platform for them. In this series of blog articles I will look at how we did this, and what we found.

Background

The client has an existing analytics architecture based primarily around Oracle database, Oracle Data Integrator (ODI), Oracle GoldenGate, and Oracle Business Intelligence Enterprise Edition (OBIEE), all running on Amazon EC2. The larger architecture in the organisation is all AWS based too.

Existing ETL processing for the system in question is done using ODI, loading data daily into a partitioned Oracle table, with OBIEE providing the reporting interface.

There were two aspects to the investigation that we did:

  • Primarily, what would an alternative platform for the ETL look like? With lots of coverage recently of the concept of "ETL offloading" and "Apache-based ETL", the client was keen to understand how they might take advantage of this

    Within this, key considerations were:

    • Cost
    • Scalability
    • Maintenance
    • Fit with existing and future architectures
  • The second aspect was to investigate whether the performance of the existing reporting could be improved. Despite having data for multiple years in Oracle, queries were too slow to provide information other than within a period of a few days.

Oracle licenses were a sensitive point for the client, who were keen to reduce - or at least, avoid increased - costs. ODI for Big Data requires additional licence, and so was not in scope for the initial investigation.

Data and Processing

The client uses their data to report on the level of requests for different products, including questions such as:

  • How many requests were there per day?
  • How many requests per product type in a given period?
  • For a given product, how many requests were there, from which country?

Data volumes were approximately 50MB, arriving in batch files every hour. Reporting requirements were previous day and before only. Being able to see data intra-day would be a bonus but was not a requirement.

High Level Approach

Since the client already uses Amazon Web Services (AWS) for all its infrastructure, it made sense to remain in the AWS realm for the first round of investigation. We broke the overall requirement down into pieces, so as to understand (a) the most appropriate tool at each point and (b) the toolset with best overall fit. A very useful reference for an Amazon-based big data design is the presentation Big Data Architectural Patterns and Best Practices on AWS. Even if you're not running on AWS, the presentation has some useful pointers for things like where to be storing your data based on volumes, frequency of access, etc.

Data Ingest

The starting point for the data was Amazon's storage service - S3, in which the data files in CSV format are landed by an external process every hour.

Processing (Compute)

Currently the processing is done by loading the external data into a partitioned Oracle table, and resolving dimension joins and de-duplication at query time.

Taking away any assumptions, other than a focus on 'new' technologies (and a bias towards AWS where appropriate), we considered:

  • Switch out Oracle for Redshift, and resolve the joins and de-duplication there
    • Loading the data to Redshift would be easy, but would be switching one RDBMS-based solution for another. Part of the aim of the exercise was to review a broader solution landscape than this.
  • Use Hadoop-based processing, running on Elastic Map Reduce (EMR):

    • Hive QL to process the data on S3 (or HDFS on EMR)
      • Not investigated, because provides none of the error handling etc that Spark would, and Spark has SparkSQL for any work that needs doing in SQL.
    • Pig
      • Still used, but 'old' technology, somewhat esoteric language, and superseded by Spark
    • Spark
      • Support for several languages including commonly-used ones such as Python
      • Gaining increasing levels of adoption in the industry
      • Opens up rich eco-system of processing possibilities with related projects such as Machine Learning, and Graph.

We opted to use Spark to process the files, joining them to the reference data, and carrying out de-duplication. For a great background and discussion on Spark and its current place in data architectures, have a listen to this podcast.

Storage

The output from Spark was written back to S3.

Analytics

With the processed data in S3, we evaluated two options here:

  • Load it to Redshift for query
  • Query in-place with a SQL-on-Hadoop engine such as Presto or Impala
    • With the data at rest on S3, Amazon's Athena is also of interest here, but was released after we carried out this particular investigation.

The presumption was that OBIEE would continue to provide the front-end to the analytics. Oracle's Data Visualization Desktop tool was also of interest.

In the next post we'll see the development environment that we used for prototyping. Stay tuned!

Categories: BI & Warehousing

Source Control and Automated Code Deployment Options for OBIEE

Wed, 2016-12-14 03:00

It's Monday morning. I've arrived at a customer site to help them - ironically enough - with automating their OBIEE code management. But, on arrival, I'm told that the OBIEE team can't meet with me because someone did a release on the previous Friday, had now gone on holiday - and the wrong code was released but they didn't know which version. All hands-on-deck, panic-stations!

This actually happened to me, and in recent months too. In this kind of situation hindsight gives us 20:20 vision, and of course there shouldn't be a single point of failure, of course code should be under version control, of course it should be automated to reduce the risk of problems during deployments. But in practice, these things often don't get done - and it's understandable why. In the very early days of a project, it will be a manual process because that's what is necessary as people get used to the tools and technology. As time goes by, project deadlines come up, and tasks like this are seen as "zero sum" - sure we can automate it, but we can also continue doing it manually and things will still get done, code will still get released. After a while, it's just accepted as how things are done. In effect, it is technical debt - and this is your reminder that debt has to be paid, sooner or later :)

I'll not pretend that managing OBIEE code in source control, and automating code deployments, is straightforward. But, it is necessary, so in this post I'll walk through why you should be doing it, and then importantly how.

Why Source Control?

Do we really need source control for OBIEE? After all, what's wrong with the tried-and-tested method of sticking it all in a folder like this?

sdlc01.png

What's wrong with this? What's right with this? Oh lack of source control, let me count the number of ways that I doth hate thee:

  1. No audit trail of who changed something
  2. No audit of what was changed, and when
  3. No enforceable naming standards for versions
  4. No secure way of identifying deployment candidates
  5. No distributed method for sharing code (don't tell me that a network share counts!)
  6. No way of reliably identifying the latest version of code

These range from the immediately practical through to the slightly more abstract but necessary in a mature deployment.

Of immediate impact is the simply ability to identify the latest version of code on which to make new changes. Download the copy from the live server? Really? No. If you're tracking your versions accurately and reliably then you simply pull the latest version of code from there, in the knowledge that it is the version that is live. No monkeying around trying to figure out if it really is (just because it's called "PROD-091216.rpd" how do you know that's actually what got released to Production? And was that on 12th December or 9th September? Who knows!).

Longer term, having a secure and auditable code line simply makes it easier and less risky to manage. It also gives you the ability to work with it in a much more flexible manner, such as genuine concurrent development by multiple developers against the RPD. You can read more about this in my presentation here.

Which Source Control?

I don't care. Not really. So long as you are using source control, I am happy.

For preference, I always advocate using git. It is a modern platform, with strong support from many desktop clients (SourceTree is my favourite, along with the commandline too, natch). Git is decentralised, meaning that you can commit and branch code locally on your own machine without having to be connected to a server. It supports a powerful fork and pull process too, which is part of the reason it has almost universal usage within the open source world. The most well known of git platforms is github, which in effect provides git as a Platform-as-a-service (PaaS), in a similar fashion to Bitbucket too. You can also run git on its own locally, or more pragmatically, with gitlab.

But if you're using Subversion (SVN), Perforce, or whatever - that's fine. The key thing is that you understand how to use it, and that it is supported within your organisation. For simple source control, pretty much all the common platforms work just fine. If you get onto more advanced use, such as feature-branches and concurrent development, you may find it worth ensuring that your chosen platform supports the workflow that you adopt. Even then, whilst I'd chose git for preference, at Rittman Mead we've helped clients develop very powerful concurrent development processes with Subversion providing the underlying source control.

What Goes into Source Control? Part 1

So you've drunk the Source Control koolaid, and accepted that really there is no excuse not to use it. So what do you put into it? The RPD? The OBIEE 12c BAR file? What if you're still on OBIEE 11g? The answer here depends partially on how you are planning to manage code deployment in your environment. For a fully automated solution, you may opt to store code in a more granular fashion than if you are simply migrating full BAR files each time. So, read on to understand about code deployment, and then we'll revisit this question again after that.

How Do You Deploy Code Changes in OBIEE?

The core code artefacts are the same between OBIEE 11g and OBIEE 12c, so I'll cover both in this article, pointing out as we go any differences.

The biggest difference with OBIEE 12c is the concept of the "Service Instance", in which the pieces for the "analytical application" are clearly defined and made portable. These components are:

  • Metadata model (RPD)
  • Presentation Catalog ("WebCat"), holding all analysis and dashboard definitions
  • Security - Application Roles and Policy grants, as well as OBIEE front-end privilege grants

Part of this is laying the foundations for what has been termed "Pluggable BI", in which 'applications' can be deployed with customisations layered on top of them. In the current (December 2016) version of OBIEE 12c we have just the Single Service Instance (ssi). Service Instances can be exported and imported to BI Archive files, known as BAR files.

The documentation for OBIEE environment migrations (known as "T2P" - Test to Production) in 12c is here. Hopefully I won't be thought too rude for saying that there is scope for expanding on it, clarifying a few points - and perhaps making more of the somewhat innocuous remark partway down the page:

PROD Service Instance metadata will be replaced with TEST metadata.

Hands up who reads the manual fully before using a product? Hands up who is going to get a shock when they destroy their Production presentation catalog after importing a service instance?...

Let's take walk through the three main code artefacts, and how to manage each one, starting with the RPD.

The RPD

The complication of deployments of the RPD is that the RPD differs between environments because of different connection pool details, and occassionally repository variable values too.

If you are not changing connection pool passwords between environments, or if you are changing anything else in your RPD (e.g. making actual model changes) between environments, then you probably shouldn't be. It's a security risk to not have different passwords, and it's bad software development practice to make code changes other than in your development environment. Perhaps you've valid reasons for doing it... perhaps not. But bear in mind that many test processes and validations are based on the premise that code will not change after being moved out of dev.

With OBIEE 12c, there are two options for managing deployment of the RPD:

  1. BAR file deploy and then connection pool update
  2. Offline RPD patch with connection pool updates, and then deploy
    • This approach is valid for OBIEE 11g too
RPD Deployment in OBIEE 12c - Option 1

This is based on the service instance / BAR concept. It is therefore only valid for OBIEE 12c.

  1. One-off setup : Using listconnectionpool to create a JSON connection pool configuration file per target environment. Store each of these files in source control.
  2. Once code is ready for promotion from Development, run exportServiceInstance to create a BAR file. Commit this BAR file to source control

    /app/oracle/biee/oracle_common/common/bin/wlst.sh <<EOF
    exportServiceInstance('/app/oracle/biee/user_projects/domains/bi/','ssi','/home/oracle','/home/oracle')
    EOF
    

  3. To deploy the updated code to the target environment:

    1. Checkout the BAR from source control
    2. Deploy it with importServiceInstance, ensuring that the importRPD flag is set.

      /app/oracle/biee/oracle_common/common/bin/wlst.sh <<EOF
      importServiceInstance('/app/oracle/biee/user_projects/domains/bi','ssi','/home/oracle/ssi.bar',true,false,false)
      EOF
      
    3. Run updateConnectionPool using the configuration file from source control for the target environment to set the connection pool credentials

      /app/oracle/biee/user_projects/domains/bi/bitools/bin/datamodel.sh updateconnectionpool -C ~/prod_cp.json -U weblogic -P Admin123 -SI ssi
      

      Note that your OBIEE system will not be able to connect to source databases to retrieve data until you update the connection pools.

    4. The BI Server should pick up the new RPD after a few minutes. You can force this by restarting the BI Server, or using "Reload Metadata" from OBIEE front end.

Whilst you can also create the BAR file with includeCredentials, you wouldn't use this for migration of code between environments - because you don't have the same connection pool database passwords in each environment. If you do have the same passwords then change it now - this is a big security risk.

The above BAR approach works fine, but be aware that if the deployed RPD is activated on the BI Server before you have updated the connection pools (step 3 above) then the BI Server will not be able to connect to the data sources and your end users will see an error. This approach is also based on storing the BAR file as whole in source control, when for preference we'd store the RPD as a standalone binary if we want to be able to do concurrent development with it.

RPD Deployment in OBIEE 12c - Option 2 (also valid for OBIEE 11g)

This approach takes the RPD on its own, and takes advantage of OBIEE's patching capabilities to prepare RPDs for the target environment prior to deployment.

  1. One-off setup: create a XUDML patch file for each target environment.

    Do this by:

    1. Take your development RPD (e.g. "DEV.rpd"), and clone it (e.g. "PROD.rpd")
    2. Open the cloned RPD (e.g. "PROD.rpd") offline in the Administration Tool. Update it only for the target environment - nothing else. This should be all connection pool passwords, and could also include connection pool DSNs and/or users, depending on how your data sources are configured. Save the RPD.
    3. Using comparerpd, create a XUDML patch file for your target environment:

      /app/oracle/biee/user_projects/domains/bi/bitools/bin/comparerpd.sh \
      -P Admin123 \
      -W Admin123 \
      -G ~/DEV.rpd \
      -C ~/PROD.rpd \
      -D ~/prod_cp.xudml
      
    4. Repeat the above process for each target environment

  2. Once code is ready for promotion from Development:

    1. Extract the RPD

      • In OBIEE 12c use downloadrpd to obtain the RPD file

        /app/oracle/biee/user_projects/domains/bi/bitools/bin/datamodel.sh \
        downloadrpd \
        -O /home/oracle/obiee.rpd \
        -W Admin123 \
        -U weblogic \
        -P Admin123 \
        -SI ssi
        
      • In OBIEE 11g copy the file from the server filesystem

    2. Commit the RPD to source control

  3. To deploy the updated code to the target environment:

    1. Checkout the RPD from source control
    2. Prepare it for the target environment by applying the patch created above

      1. Check out the XUDML patch file for the appropriate environment from source control
      2. Apply the patch file using biserverxmlexec:

        /app/oracle/biee/user_projects/domains/bi/bitools/bin/biserverxmlexec.sh \
        -P Admin123 \
        -S Admin123 \
        -I prod_cp.xudml \
        -B obiee.rpd \
        -O /tmp/prod.rpd
        
    3. Deploy the patched RPD file

      • In OBIEE 12c use uploadrpd

        /app/oracle/biee/user_projects/domains/bi/bitools/bin/datamodel.sh \
        uploadrpd \
        -I /tmp/prod.rpd \
        -W Admin123 \
        -U weblogic \
        -P Admin123 \
        -SI ssi \
        -D
        

        The RPD is available straightaway. No BI Server restart is needed.

      • In OBIEE 11g use WLST's uploadRepository to programatically do this, or manually from EM.

        After deploying the RPD in OBIEE 11g, you need to restart the BI Server.

This approach is the best (only) option for OBIEE 11g. For OBIEE 12c I also prefer it as it is 'lighter' than a full BAR, more solid in terms of connection pools (since they're set prior to deployment, not after), and it enables greater flexibility in terms of RPD changes during migration since any RPD change can be encompassed in the patch file.

Note that the OBIEE 12c product manual states that uploadrpd/downloadrpd are for:

"...repository diagnostic and development purposes such as testing, only ... all other repository development and maintenance situations, you should use BAR to utilize BAR's repository upgrade and patching capabilities and benefits.".

Maybe in the future the BAR capabilites will extend beyond what they currently do - but as of now, I've yet to see a definitive reason to use them and not uploadrpd/downloadrpd.

The Presentation Catalog ("WebCat")

The Presentation Catalog stores the definition of all analyses and dashboards in OBIEE, along with supporting objects including Filters, Conditions, and Agents. It differs significantly from the RPD when it comes to environment migrations. The RPD can be seen in more traditional software development lifecycle terms, sine it is built and developed in Development, and when deployed in subsequent environment overwrites in entirety what is currently there. However, the Presentation Catalog is not so simple.

Commonly, content in the Presentation Catalog is created by developers as part of 'pre-canned' reporting and dashboard packs, to be released along with the RPD to end-users. Where things get difficult is that the Presentation Catalog is also written to in Production. This can include:

  • User-developed content saved in one (or both) of:
    • My Folders
    • Shared, e.g. special subfolders per department for sharing common reports outside of "gold standard" ones
  • User's profile data, including timezone and language settings, saved dashboard customisations, preferred delivery devices, and more
  • System configuration data, such as default formatting for specific columns, bookmarks, etc

In your environment you maybe don't permit some of these (for example, disabling access to My Folders is not uncommon). But almost certainly, you'll want your users to be able to persist their environment settings between sessions.

The impact of this is that the Presentation Catalog becomes complex to manage. We can't just overwrite the whole catalog when we come to deployment in Production, because if we do so all of the above listed content will get deleted. And that won't make us popular with users, at all.

So how do we bring any kind of mature software development practice to the Presentation Catalog, assuming that we have report development being done in non-Production environments?

We have two possible approaches:

  1. Deploy the full catalog into Production each time, but backup first existing content that we don't want to lose, and restore it after the deploy
    • Fiddly, but means that we don't have to worry about which bits of the catalog go in source control - all of it does. This has consequences for if we want to do branch-based development with source control, in that we can't. This is because the catalog will exist as a single binary (whether BAR or 7ZIP), so there'll be no merging with the source control tool possible.
    • Risky, if we forget to backup the user content first, or something goes wrong in the process
    • A 'heavier' operation involving the whole catalog and therefore almost certainly requiring the catalog to be in maintenance-mode (read only).
  2. Deploy the whole catalog once, and then do all subsequent deploys as deltas (i.e. only what has changed in the source environment)
    • Less risky, since not overwriting whole target environment catalog
    • More flexible, and more granular so easier to track in source control (and optionally do branch-based development).
    • Requires more complex automated deployment process.

Both methods can be used with OBIEE 11g and 12c.

Presentation Catalog Migration in OBIEE - Option 1

In this option, the entire Catalog is deployed, but content that we want to retain backed up first, and then re-instated after the full catalog deploy.

First we take the entire catalog from the source environment and store it in source control. With OBIEE 12c this is done using the exportServiceInstance WLST command (see the example with the RPD above) to create a BAR file. With OBIEE 11g, you would create an archive of the catalog at its root using 7-zip/tar/gzip (but not winzip).

When ready to deploy to the target environment, we first backup the folders that we want to preserve. Which folders might we want to preserve?

  1. /users - this holds both objects that users have created and saved in My Folders, as well as user profile information (including timezone preferences, delivery profiles, dashboard customisations, and more)
  2. /system - this hold system internal settings, which include things such as authorisations for the OBIEE front end (/system/privs), as well as column formatting defaults (/system/metadata), global variables (/system/globalvariables), and bookmarks (/system/bookmarks).
    • See note below regarding the /system/privs folder
  3. /shared/<…>/<…> - if users are permitted to create content directly in the Shared area of the catalog you will want to preserve this. A valid use of this is for teams to share content developed internally, instead of (or prior to) it being released to the wider user community through a more formal process (the latter being often called 'gold standard' reports).

Regardless of whether we are using OBIEE 11g or 12c we create a backup of the folders identified by using the Archive functionality of OBIEE. This is NOT just creating a .zip file of the file system folders - which is completely unsupported and a bad idea for catalog management, except at the very root level. Instead, the Archive functionality creates a .catalog file which can be stored in source control, and unarchived back into OBIEE to restore content.

You can create OBIEE catalog archives in one of four ways, which are also valid for importing the content back into OBIEE too:

  1. Manually, through OBIEE front-end
  2. Manually, through Catalog Manager GUI
  3. Automatically, through Catalog Manager CLI (runcat.sh)

    • Archive:

      runcat.sh \
      -cmd archive  \
      -online http://demo.us.oracle.com:7780/analytics/saw.dll \
      -credentials /tmp/creds.txt \
      -folder "/shared/HR" \
      -outputFile /home/oracle/hr.catalog
      
    • Unarchive:

      runcat.sh \
      -cmd unarchive \
      -inputFile hr.catalog \
      -folder /shared \
      -online http://demo.us.oracle.com:7780/analytics/saw.dll  \
      -credentials /tmp/creds.txt \
      -overwrite all
      
  4. Automatically, using the WebCatalogService API (copyItem2 / pasteItem2).

Having taken a copy of the necessary folders, we then deploy the entire catalog (with the changes from the development in) taken from source control. Deployment is done in OBIEE 12c using importServiceInstance. In OBIEE 11g it's done by taking the server offline, and replacing the catalog with the filesystem archive to 7zip of the entire catalog.

Finally, we then restore the folders previously saved, using the Unarchive function to import the .catalog files:

Presentation Catalog Migration in OBIEE - Option 2

In this option we take a more granular approach to catalog migration. The entire catalog from development is only deployed once, and after that only .catalog files from development are put into source control and then deployed to the target environment.

As before, the entire catalog is initially taken from the development environment, and stored in source control. With OBIEE 12c this is done using the exportServiceInstance WLST command (see the example with the RPD above) to create a BAR file. With OBIEE 11g, you would create an archive of the catalog at its root using 7zip.

Note that this is only done once, as the initial 'baseline'.

The first time an environment is commissioned, the baseline is used to populate the catalog, using the same process as in option 1 above (in 12c, importServiceInstance/ in 11g unzip of full catalog filesystem copy).

After this, any work that is done in the catalog in the development environment is migrated through by using OBIEE's archive function against just the necessary /shared subfolder to a .catalog file, storing this in source control

This is then imported to target environment with unarchive capability. See above in option 1 for details of using archive/unarchive - just remember that this is archiving with OBIEE, not using 7zip!

You will need to determine at what level you take this folder: -

  • If you archive the whole of /shared each time you'll never be able to do branch-based development with the catalog in which you want to merge branches (because the .catalog file is binary).
  • If you instead work at, say, department level (/shared/HR, /shared/sales, etc) then the highest grain for concurrent catalog development would be the department. The lower down the tree you go the greater the scope for independent concurrent development, but the greater the complexity to manage. This is because you want to be automating the unarchival of these .catalog files to the target environment, so having to deal with multiple levels of folder hierarchy gets hard work.

It's a trade off between the number of developers, breadth of development scope, and how simple you want to make the release process.

The benefit of this approach is that content created in Production remains completely untouched. Users can continue to create their content, save their profile settings, and so on.

Presentation Catalog Migration - OBIEE Privilege Grants

Permissions set in the OBIEE front end are stored in the Presentation Catalog's /system/privs folder.

Therefore, how this folder is treated during migration dictates where you must apply your security grants (or conversely, where you set your security grants dictates how you should treat the folder in migrations). For me the "correct" approach would be to define the full set of privileges in the development environment and the migrate these through along with pre-built objects in /shared through to Production. If you have a less formal approach to environments, or for whatever reason permissions are granted directly in Production, you will need to ensure that the /system/privs folder isn't overwritten during catalog deployments.

When you create a BAR file in OBIEE 12c, it does include /system/privs (and /system/metadata). Therefore, if you are happy for these to be overwritten from the source environment, you would not need to backup/restore these folders. If you set includeCatalogRuntimeInfo in the OBIEE 12c export to BAR, it will also include the complete /system folder as well as /users.

Agents

Regardless of how you move Catalog content between environments, if you have Agents you need to look after them too. When you move Agents between environment, they are not automatically registered with the BI Scheduler in the target environment. You either have to do this manually, or with the web service API : WebCatalogService.readObjects to get the XML for the agent, and then submit it to iBotService.writeIBot which will register it with the BI Scheduler.

Security
  • In terms of the Policy store (Application Roles and Policy grants), these are managed by the Security element of the BAR and migration through the environments is simple. You can deploy the policy store alone in OBIEE 12c using the importJazn flag of importServiceInstance. In OBIEE 11g it's not so simple - you have to use the migrateSecurityStore WLST command.
  • Data/Object security defined in the RPD gets migrated automatically through the RPD, by definition
  • See above for a discussion of OBIEE front-end privilege grants.
What Goes into Source Control? Part 2

So, suddenly this question looks a bit less simple than when orginally posed at the beginning of this article. In essence, you need to store:

  1. RPD
    1. BAR + JSON configuration for each environment's connection pools -- 12c only, simpler, but less flexible and won't support concurrent development easily
    2. RPD (.rpd) + XUDML patch file for each environment's connection pools -- works in 11g too, supports concurrent development
  2. Presentation Catalog
    1. Entire catalog (BAR in 12c / 7zip in 11g) -- simpler, but impossible to manage branch-based concurrent development
    2. Catalog baseline (BAR in 12c / 7zip in 11g) plus delta .catalog files -- More complex, but more flexible, and support concurrent development
  3. Security
    1. BAR file (OBIEE 12c)
    2. system-jazn-data.xml (OBIEE 11g)
  4. Any other files that are changed for your deployment.

    It's important that when you provision a new environment you can set it up the same as the others. It is also invaluable to have previous versions of these files so as to be able to rollback changes if needed, and to track what settings have changed over time.

    This could include:

    • Configuration files (nqsconfig.ini, instanceconfig.xml, tnsnames.ora, etc)
    • Custom skins & styles
    • writeback templates
    • etc
Summary

I never said it was simple ;-)

OBIEE is an extremely powerful product, and just as you have to take care to build your data models correctly, you also need to take care to understand why and how to manage your code correctly. What I've tried to do here is pull together the different options available, and lay them out with their respectively pros and cons. Let me know in the comments below what you think and how you manage OBIEE code at your site.

One of the key messages that it's important to get across is this: there are varying degrees of complexity with which you can embrace source control. All are valid, and in fact an incremental adoption of them rather than big-bang can sometimes be a better idea:

  • At one end of the scale, you simply use source control to hold copies of all your code, and continue to deploy manually
  • Getting a bit smarter, automating code deployments from source control. Code development is still done serially though.
  • At the other end of the scale, you use source control with branch-based feature-driven concurrent development. Completed features are merged automatically with RPD conflicts managed by the OBIEE tooling from the command line. Testing and deployment are both automated.

If you'd like assistance with your OBIEE development and deployment practices, including fully automated source-control driven concurrent development management, please get in touch with us here at Rittman Mead. We would be delighted to use our extensive experience in this field to produce a flexible and customised process for your particular environment and requirements.

You can find the companion slide deck to this article, with further discussion on concurrent development, here.

Categories: BI & Warehousing

The Visual Plugin Pack for OBIEE

Tue, 2016-12-13 04:00

Last week we announced the Rittman Mead Open Source project, and released into open source:

  • the excellent Insights project, a javascript API/framework for building a new frontend for OBIEE, written by Minesh Patel
  • Enhanced usage tracking for OBIEE, to track click-by-click how your users interact with the application

Today it is the turn of the Visual Plugin Pack.....

What is the Visual Plugin Pack for OBIEE ?

Visual Plugin Pack (VPP) is a means by which users of OBIEE Answers can use custom JavaScript visualisations without having to write any javascript!

It is a framework that enables developers to build Javascript visualisation plugins, that report builders can then utilise and configure through native OBIEE user interface.

I want to point this out from the very start, that despite its name, the Visual Plugin Pack is not a pack of all-singing, all-dancing, super-duper visualisations for OBIEE.

Instead, VPP should be thought of as a framework that allows you to quickly develop and integrate all-singing, all-dancing, super-duper visualisations that will work natively within OBIEE.

Subtle difference, but an important one.

So what does it do ?

Essentially, VPP allows you to accelerate the development and deployment of custom, configurable and reusable OBIEE JavaScript visualisations.

Back in 2013 I wrote this post describing how to embed a D3 Visualisation within OBIEE. The same method will still work today, but it's a cumbersome process and requires heavy use of the narrative form, which let's be honest, is a painful experience when it comes to JavaScript development.

Some drawbacks with this method:

  • Code editing in the Narrative view is not productive.
  • Reusing visualisations in other analyses requires the copying and pasting of code.
  • Basic Visualisation configuration changes, for example, width, height, colours, font etc requires code changes.
  • Remapping Column bindings requires code changes.
  • JavaScript library dependencies and load order can be tricky to manage.

The Visual Plugin Pack attempts to address these issues by abstracting away the complexities of the Narrative form and allowing developers to focus on visualisation code, not OBIEE integration code.
If you choose to use VPP for your visualisations then you will never have to touch the narrative form, all visualisation development can take place outside of OBIEE in your favourite code editor and deployed to Weblogic when you are done.

VPP also allows you to define design-time controls that affect column bindings and visualisation behaviour. The example visualisation below has been written to accept 5 column bindings and 1 configuration component, which controls the visualisation size. You can create as many column bindings and configuration components as you need

scatterplot matrix

How do I get started ?

You can download or fork the repository from here.

Installation and developer guides can be found on the wiki:-

There are several visualisations that come bundled with VPP, some more polished than others, but they should serve as good examples that can be enhanced further.

Summary

If you've got some in-house JavaScript skills and are looking to develop and integrate custom visualisations into OBIEE, then VPP can help alleviate a lot of the frustrations associated with the traditional method. Once you're up and running you'll be able to develop faster, integrate quickly and share your visualisations with all OBIEE report writers.

If you'd like to discuss how Rittman Mead can help with deployment or assist with custom visualisation development feel free to contact us.

Categories: BI & Warehousing

Enhanced Usage Tracking for OBIEE - Now Available as Open Source!

Mon, 2016-12-12 04:00
Introduction

OBIEE provides Usage Tracking as part of the core product functionality. It writes directly to a database table every Logical Query that hits the BI Server, including details of who ran it, when, and information about how it executed including for how long, how many rows, and so on. This in itself is a veritable goldmine of information about your OBIEE system. All OBIEE deployments should have Usage Tracking enabled, for supporting performance analysis, capacity planning, catalog rationalisation, and more.

What Usage Tracking doesn't track is interactions between the end user and the Presentation Services component. Presentation Services sits between the end user and the BI Server from where the actual queries get executed. This means that until a user executes an analysis, there's no record of their actions in Usage Tracking. There is this audit data available, but you have to manually enable and collect it, which can be tricky. This is where Enhanced Usage Tracking comes in. It enables the collection and parsing of every click a user makes in OBIEE. For an overview of the potential of this data, see the article here and here.

Today we're pleased to announce the release into open-source of Enhanced Usage Tracking! You can find the github repository here: https://github.com/RittmanMead/obi-enhanced-usage-tracking.

Highlights of the data that Enhanced Usage Tracking provides includes:

  • Which web browsers do people use? Who is accessing OBIEE with a mobile device?

  • Who deleted a catalog object? Who moved it?

  • What dashboards get exported to Excel most frequently, and by whom?

The above visualisations are from both Kibana, and OBIEE. The data from Enhanced Usage Tracking can be loaded into Elasticsearch, and is also available from Oracle tables too, hence you can put OBIEE itself on top of it, or DV:

eut108.png

How to use Enhanced Usage Tracking

See the github repository for full detail on how to install and run the code.

TODO

What's left TODO? Here are a few ideas if you'd like to help build on this tool. I've linked each title to the relevant github issue.

TODO 01

The sawlog is a rich source of lots of data, but the Logstash script has to know how to parse it. It's all down to the grok statement which identifies fields to extract and defined their deliniators. Use grokdebug.herokuapp.com to help master your syntax. From there, the data can be emitted to CSV and loaded into Oracle.

Here's an example of something yet to build - when items are moved and deleted in the Catalog, it is all logged. What, who, and when. The Logstash grok currently scrapes this, but the data isn't included in the CSV output, nor loaded into Oracle.

eut105.png

Don't forget to submit a pull request for any changes to the code that would benefit others in the community!

You'll also find loading the data directly into Elasticsearch easier than redefining the Oracle table DDL and load script each time, since in Elasticsearch the 'schema' can evolve based simply on the data that Logstash sends to it.

TODO 02

Version 5 of the Elastic stack was released in late 2016, and it would be good to test this code with it and update the README section above to indicate if it works - or submit the required changes needed for it to do so.

TODO 03

There's lots of possibilities for this data. Auditing who did what, when, is useful (e.g. who deleted a report?). Taking it a step further, are there patterns in user behaviour? Certain patterns of clicks that could be identified to highlight users who are struggling to find the data that they want? For example, opening lots of presentation folders in the Answers editor before adding columns to the analysis? Can we extend that to identify users who are struggling to use the tool and are going to "churn" (stop using it) and thus contact them before they do so to help resolve any issues they have?

TODO 04

At the moment the scripts are manual to invoke and run. It would be neat to package this up into a service (or set of services) that could run automagically at server boot.

Until then, using GNU screen is a handy hack for setting scripts running and being able to disconnect from the server without terminating them. It's like using nohup ... &, except you can reconnect to the session itself as and when you want to.

TODO 05

Click events have defined 'Request' types, and these I have roughly grouped together into 'Request Groups' to help describe what the user was doing (e.g. Logon / Edit Report / Run Report). Not all requests have been assigned to request groups. It would be useful to identify all request types, and refine further the groupings.

TODO 06

At the moment only clicks in Presentation Services are captured and analysed. I bet the same can be done for Data Visualization/Visual Analyzer too ...

Problems?

Please raise any issues on the github issue tracker. This is open source, so bear in mind that it's no-one's "job" to maintain the code - it's open to the community to use, benefit from, and maintain.

If you'd like specific help with an implementation, Rittman Mead would be delighted to assist - please do get in touch to discuss our rates.

Categories: BI & Warehousing

Insights - An Open-Source Visualisation Platform for OBIEE

Fri, 2016-12-09 04:00

On and off over the last year, I have spent some time developing a customisable framework for building visualisations and dashboards, using OBIEE as the back-end. The result is Insights, a JavaScript web application that offers a modern alternative to OBIEE Answers. As of today, we have officially open sourced the project, so you are free to download, install, hack and contribute as you please.

The primary motive for building this application was to meet some very bespoke reporting requirements for a client, which I mention in my previous blog describing the prototype. During this piece of work I wrote an object orientated interface for the OBIEE web services. The icing on the cake was tying it into Tom Underhill's Visual Plugin Pack.

You can see more information about Insights in a presentation that I did at the recent UKOUG conference here: Bridging the Gap: Enhancing OBIEE with a Custom Visualisation Platform

Since then a lot of the work has been put in to make it developer friendly, visually appealing and hopefully easier to use. I'll be the first to admit that it's far from perfect, but it should be a decent starting point.

Getting Started

In order to use Insights you will need OBIEE 11.1.1.9 or above. Additionally, the application has only been tested using IE11 or Chrome browsers and so compatibility with other browsers cannot be guaranteed.

First, download the application or fork the Git repository.

There is an installation guide in the project at docs/installation.html. Follow this guide to deploy the application on your OBIEE server.

Demo

This is a quick step-by-step demonstration creating a basic dashboard, showing off some of the features in the application (apologies if the GIFs take a while to load).

First you log in, using your usual OBIEE credentials. The homepage shows some pre-configured dashboards here, but we're going to click the pencil to create a new one.

Logging in

Next I've dragged in some columns from my subject area, Sample App and run the query, displaying the default table plugin.

Add Columns

In this step, I've gone to the configuration tab, and changed the colour of my table.

Configuration

Now I change the plugin type using the drop down menu at the top. Notice that my previous table visualisation gets stored on the right. By clicking the Store button manually, it also adds my new pie chart. Then we can flick between them easily.

Store Pie Chart

Filters can be added by clicking the icon next to the column on the subject area panel.

Filters

Adding in a sunburst chart, and playing with some of the colours here.

Sunburst

Now we have our visualisations, we can begin constructing our dashboard. You can freely move around and resize the visualisations as you choose. I recommend hiding the panels for this, as the full screen is much closer to what users will see when viewing the dashboard.

Dashboard

The next GIF shows the interaction framework, which can be used to implement UI features where the user interacts with one visualisation and another visualisation on the page reacts to it. In its most basic form, each plugin type can be filtered - where OBIEE runs the query again. Although more complex reactions that are specific to a certain chart type can also be configured, as seen below with the sunburst chart.

Interactivity

Dashboard prompts can be added by clicking the filter icon next to one of the RPD columns. Any visualisations using this subject area will respond to the prompt. The prompt box can be freely placed on the canvas like any other object.

Dashboard Prompt

Finally, we can save the object to the web catalogue. This saves as a hidden analysis object in the OBIEE web catalogue and contains all of the information to recreate the dashboard when loading. All OBIEE security features are preserved, so users will only be able to access folders and reports they have permissions for.

Save to Web Catalogue

Finished dashboards can be viewed in the application once they have been saved. The dashboard viewer will show all dashboard objects in that folder as different pages, available from the left pane. Images can be exported to PNG and PDF as well as data from the visualisations exporting to Excel and CSV.

Viewing Dashboards

So How Do I Learn More?

The slides that I did at UKOUG describing Insights give a comprehensive overview of the design behind the tool. You can find them here.

Summary

In a nutshell, those are the main features of the application. Feel free to try it out and have a read through the documentation (available through the application itself or offline as HTML files in the docs directory).

As an open source application there is no official support, however if you experience any bugs or have any requests for enhancements, please post them on the issue tracker.

We hope you enjoy using the app and if you would like to enlist our expertise to help you deploy and develop using this platform, feel free to contact us to discuss it further.

.post-content img { max-width: 100%; }
Categories: BI & Warehousing

The Rittman Mead Open Source Project

Thu, 2016-12-08 08:00

We have a strong innovation spirit at Rittman Mead, with all staff encouraged to use technology to its best advantage in order to do things with the software that haven't been done before. Some of these projects may may be 'scratching the itch' of a repeated manual task that should be automated. Others use technology to extend the capabilities of the tools or write new ones to fill gaps that have been identified.

At Rittman Mead we pride ourselves in our sharing of knowledge with the BI/DI community, both 'offline' at conferences and online through our blog. Today we are excited to extend this further, with the release over the next few days and weeks into open-source of some key code projects: -

  • insights - a javascript API/framework for building a new frontend for OBIEE, building on the OBIEE web service interface, as described here
  • vpp - "Visual Plugin Pack" - innovative visualisation capabilities to use natively within OBIEE
  • obi-enhanced-usage-tracking - the ability to track and audit user behaviour per-click, as described here

They will be available shortly on the Rittman Mead GitHub repository. The license for these is the MIT licence.

These projects are in addition to existing code that we have shared with the community over the years, including the obi-metrics-agent tool and the popular OBIEE 11g Linux service script.

We're very excited about opening up these projects to the community, and would be delighted to see forks and pull-requests as people build and expand on them. It should go without saying, but these are contributed 'as is'; any bugs and problems you find we will happily receive a pull request for :-)

If you would like help implementing and extending these for your own project, we would be delighted to offer services in doing so - just get in touch to find out more.

Over the next few weeks keep an eye on the blog for more information about each project, and future ones.

Categories: BI & Warehousing

An Oracle DVD story of... DVDs

Thu, 2016-12-08 07:00

Have you ever wondered what the trend in movie releases has been for the past few decades? Comparing the number of Sci-Fi releases vs. Romantic Comedy releases? Me too, which is why I've taken my first look Oracle Data Visualization Desktop (DVD) to spot trends between these movie genres - Sci-Fi and the Romantic Comedy.

For this post, I found an interesting dataset from IMDB.com on Kaggle.com, listing a smattering of movies since the early 1900s from which to sort and analyze. For this example, I will contrast the number of releases between the two movie genres, looking for any possible relationship as to number of releases for both.
If you haven't installed the application yet, take a quick look at Matthew Walding's post for a good introduction. Oracle's DVD installer is fairly quick and simple, and you'll be creating visualizations in no time.

So, once the DVD application is running, we can create our first project:

Or, alternatively...


 Next, we'll need a data source:

And, we'll import the CSV-formatted file I downloaded from Kaggle.com earlier:


 Select the "movie_metadata.csv" file to import:

And, change the Name to "IMDB Movies A" for clarification:

After the file has been imported, we see a problem:

Clicking "More Detail", the following screen displays the detail we can use to troubleshoot the query error:

For troubleshooting, I used the highlighted value and found the problem is with the "budget" column, which requires a datatype change from "Integer" to "Double":

The next task is to create a method of identifying a specific movie genre, however, as you can see, all genre labels for each movie are stored in a pipe-delimited value of the genres column:

So, for this demonstration, I've chosen to add a calculated column for each genre I want to analyze, locating the desired string within the pipe-delimited value under the assumption that the same value, "Sci-Fi" for instance, is recorded with the same characters in every occurrence of each pipe-delimited value.

For the first column (data element), I chose the functions LOCATE and SIGN to provide a simple logical indicator (0 and 1) that can be aggregated (summed) easily.

Click the "Validate" button to verify syntax:

The LOCATE function returns a positive integer where the expression "Sci-Fi" is located in a given string, the genres data element in this case. The SIGN function subsequently returns either a 0, 1, or -1, depending on the sign of the resulting integer from the LOCATE function. 1 (one) indicates yes, this movie release includes a Sci-Fi label for genre. 0 (zero) indicates a missing Sci-Fi label for genre, for example.

Here is the new column, appended to our existing dataset:


Next, I will create another column to identify the Romantic Comedy genre - genre_RomCom_Ind, as follows:

With the two new data columns, our dataset is expanded accordingly:


Now, it's let's create the visualizations:


Let's create a bar graph for each of our new Indicator columns, starting with the Sci-Fi genre:


We'll create a filter to include data only for title years between 1977 and 2015:


Afterward, our initial graph appears as follows, with a default aggregate summing all genre_SciFi_Ind values (0 or 1) for each title year:


Now, let's add a similar bar graph for all Romantic Comedy (genre_RomCom_Ind) releases. Notice, the same filter for title year will be applied to this new graph:


Next, I'll change the labels for each graph, providing proper context for the visualization:


We can also change the aggregate method used for the graph, when necessary:


For an added touch, let's add a trend line to the bar graph for even easier viewing:


And now, our graph appears as follows:


Applying similar modifications to our Sci-Fi Releases graph and displaying both graphs together on Canvas 1, we have the following:


Next, we'll add this Canvas to an Insight, select Narrate, and add our own description of any interesting comparisons we can identify:


In viewing the two graphs side-by-side, we notice one interesting outlier that, in year 2010, the number of Romantic Comedy releases outnumbered Sci-Fi releases by 21 movies, and on this Insight, I can enter a description (narration) beneath the graphs, highlighting this departure from the plotted trend line, as shown below. Another interesting, and unexpected, trend we see is the decreasing number of Romantic Comedy releases after 2008. But, do these two graphs display an obvious relationship, or correlation, between the two genres, either positive or negative? If we look at each trend line between 1990 and 2008, each movie genre shows an increasing number of releases, generally speaking, and leaves a somewhat inconclusive determination as to correlation, although, the periods after 2008 seem to indicate an inverse relationship.


At this point, the project can be saved (with a new title), exported, and/or printed:


In this post, I've demonstrated a basic example as an introduction to Oracle Data Visualization Desktop, with IMDB movie data, to visually quantify the number of movie releases in a given timeline, 1977 to 2015, creating custom calculations and dynamic visualizations for our particular measures.

One note I would add is that I did not attempt to eliminate any overlapping indicators, which were minimal, in the newly-added columns, genre_Sci-Fi_Ind and genre_RomCom_Ind. Although, it is possible that a movie can be labeled as all three - Sci-Fi, Romance, and Comedy, it did not distort the overall trend. And, these graphs now create a question - Can we know what influences the release of Sci-Fi movies and their increasing popularity? Are Romantic Comedy movies truly decreasing in popularity or is the movie studios choice to decrease the number of RomCom releases because of the surge in Sci-Fi releases? We all understand limitations of all studios investment capital, but must the RomCom genre suffer because of the Sci-Fi genre? If so, why? I realize this is a simplified view of the trend, but does lend itself to more scrutiny among other genres as well.

Categories: BI & Warehousing

Introducing On Demand Training from Rittman Mead

Mon, 2016-12-05 08:00

Rittman Mead is happy to announce that its much anticipated On Demand Training (ODT) service is live, giving people the opportunity to receive expertly written & delivered self-paced training courses that can be accessed online anywhere, at anytime.

We have been delivering technical & end-user courses based on Oracle Analytics products all over the world for the past decade.

While our classroom sessions continue to offer an unrivalled experience for our trainees, we understand that in the modern era, flexibility is important.

ODT has been built based on the feedback of our clients and network so that you can:

  • Experience our training regardless of the distance to travel

  • Keep your member’s of staff on site at crucial periods in your company’s calendar

  • Give participants the ability to reinforce the lessons they’ve learnt afterwards

Learn

Use Rittman Meads LMS as your virtual classroom to access all course materials, lesson demos and slides

Practice

Get hands on with your very own cloud based training environment

Engage

Submit questions to Rittman Meads Principal Trainer network on subjects that might be specific to your everyday use of the product

Each course provides 30 days access to the above, ensuring you have enough time to learn at your pace and re-enforce each lesson.

We’re feeling particularly seasonal down here in Brighton, so to celebrate the launch of our platform we’re offering a 40% discount on our first live course OBIEE 12c Front End Development & Data Visualization between now and January 31st.

Simply use the discount code RMODT12C on checkout to take advantage of this exclusive offer.

For more details and to start your On Demand learning experience with Rittman Mead please check out:

  • Our webpage where you can find out more information about ODT and register an account for our LMS
  • The Rittman Mead LMS where you can view our course catalog and purchase courses

You can also contact training@rittmanmead.com if you have any questions about the service.

Happy Learning!!!

Categories: BI & Warehousing

Work-Life Balance at Rittman Mead

Wed, 2016-11-30 20:00
Work-Life Balance at Rittman Mead



Rittman Mead has always had a long standing commitment to giving back, not only to the technology industry, but to local and global communities as well.  Recently, Rittman Mead employees have been encouraged to take up to 40 paid hours to participate in community service opportunities.  This year, I chose to use my 40 hours to serve at an orphanage in the Dong Nai province of Vietnam.  The Bien Hoa Center for Supporting and Vocational Training is an orphanage that currently serves 53 children, ranging in age from infants to 16 year olds.

Work-Life Balance at Rittman Mead

Additionally, the Bien Hoa Center was home to my recently adopted son for over 7 years.   So this was a place near and dear to my heart. The orphanage is run by a very attentive staff, who do a great job caring for the kids, despite having very limited resources.   Many of the children are learning english and other useful skills that will serve them well once they leave the orphanage, either through aging out of the program or through adoption.  While we were there, my family and I were able to play with the older children and comfort the babies.  It was a pleasure to see them display such beautiful, wide smiles despite their difficult situations.   Work-Life Balance at Rittman Mead We were also able to deliver a gift donated by many of my generous Rittman Mead colleagues, which included over 60lbs of art supplies, candy and toys.  Despite our consultants being separated across many states, once I posted an opportunity to contribute, gifts just started arriving at our Atlanta office, where I hang my laptop bag.

Work-Life Balance at Rittman Mead

It is truly a pleasure working with such compassionate people and having a management team that values more than just profits.  Caring about the causes that are important to employees is a big part of the Rittman Mead culture. This attitude, coupled with numerous family friendly work events, makes employees feel like more than simply a cog in the wheels of a profit machine. At Rittman Mead, employees are supported in their pursuit of a healthy work-life balance and that is one of the big reasons I am proud to work here.


Categories: BI & Warehousing

Rittman Mead at UKOUG Tech 16

Thu, 2016-11-24 11:52
Rittman Mead at UKOUG Tech 16

This year as always Rittman Mead is coming to UKOUG Tech 16 with a strong presence and a great line up of sessions covering OBIEE, ODI, Kafka, advanced visualisation and more. And yes, there will be Cloud!

Rittman Mead at UKOUG Tech 16

Here is the details of the Rittman Mead sessions :

There is no better way to finish a conference than with two success stories from our recent engagements! If the OBIEE 12c upgrade depicted in Francesco's session is something you are also looking to achieve, we would be pleased to tell you more about it and to see how we can help you.

And of course, we are also happy to answer any questions if you see us in sessions or around the conference. You can find some of us during the Oracle Big Data meetup (Monday evening) or the ODTUG Data and Analytics Switzerland meetup (Tuesday evening).

So see you in two weeks in Birmingham !

Categories: BI & Warehousing

How to Use Versioning in ODI 12c

Wed, 2016-11-23 09:00
How to Use Versioning in ODI 12c

How many times have you been working on a project and something goes wrong, or the power shuts off, or you go on vacation and someone has messed with your code or somehow your work is lost? Well, now you have an alternative to safe proof your project work.

Versioning in ODI is allowed at various hierarchy levels within the instances and is stored in the master repository. What this means is that if you have multiple work repositories connected to the same master, you can see all the versions when connected to either work repository.

A version is a backup copy of an object that is saved as a checkpoint in ODI. ODI allows you to version Projects, Folders, Packages, Scenarios, Load Plans, Mappings, Procedures, Knowledge Modules, Models, Model Folders and Solutions. You will need to decide which objects to create and manage versions for, but this tutorial will review the process using Packages.

Fast Review: A package is made up of steps organized into a diagram that is executed. The steps include mappings, variables, procedures, ODI tools (such as OdiXMLConcat, OdiZip, etc), models, sub-models and datastores.

  1. Connect and move to the Designer Navigator and expand the Projects folder.

    How to Use Versioning in ODI 12c

  2. Select and expand Packages. In our example I will open up Target Data Load.

    How to Use Versioning in ODI 12c

  3. As you see below a well trained ODI developer has set up the load plans to run in a specific execution and if any fail, to send an email.

    How to Use Versioning in ODI 12c

    In addition to a fail notification, a new manager also wants to know the package executed successfully. We will create an original version of the package and then add a new email notification. Once we have finished our changes, we will create a new version.
  4. Right-click on the Target Data Load package and select Version > Create Version

    How to Use Versioning in ODI 12c

  5. Type 'Target Data Load v1' as the name and 'Existing Target Data Load package (original)' in the description. You can name the original (or next version, depending on where you are in your versions) whatever you would like. Best practice is to keep the name simple and a version. Make sure to put more descriptive details about the purpose of the version in the description box below the version name.

    How to Use Versioning in ODI 12c

    Now that we have our original version safely created, we can make our changes to the package.

  6. If it is not expanded, expand the Internet accordion of the ODI toolbox and locate OdiSendMail. Highlight OdiSendMail and place it on the canvas to the right

    How to Use Versioning in ODI 12c

  7. Using the toolbar connect your last mapping to OdiSendMail 2 using the green ok arrow. Remember to click on the mapping and drag your cursor to OdiSendMail 2.

    How to Use Versioning in ODI 12c

  8. Make sure to click on the white cursor from menu (above the Toolbox, to the left of the green arrows) and click Odi Send Mail 2 so the properties window populates with the mail server information. Press Save. Contact your company email or network admin to get the required smtp information.

    Pro-tip: For our demonstrations and tutorials we use mailtrap.io. It is a working dummy smtp testing server.

  9. Once completed your screen should look similar to the image below.

    How to Use Versioning in ODI 12c

  10. Before we version the changes, we should test them. Execute the new package by right clicking on the newly modified package, in our tutorial it will be Target Data Load. Click OK and accept the defaults in the Run window.

    WARNING: Make sure you are in the correct environment for testing and development. DO NOT run any package that will change or affect any real data.

    How to Use Versioning in ODI 12c

  11. Now check your execution under the Operator tab to make sure it was successfully executed.

    How to Use Versioning in ODI 12c

  12. Now we will create a new version of the package that will be the latest version. We will repeat earlier steps, so return to the Designer Navigator > Projects window and click on the Target Data Load package. Right-click Target Data Load package and go to Version > Create Version. Note when the window appears you see the original version in the bottom box.

    Leave the default name and change it to v2. Then for a description put 'Added successful execution for Target Data Load mappings OdiSendMail' and click ok.

    How to Use Versioning in ODI 12c

    You have now created 2 versions of your package: the original version and the modified version with the success email. Versioning is a key feature in ODI that really allows multiple developers to interact and work together to achieve the best results for data integration projects.

Special Note:

In this post, we reviewed how to use internal versioning in ODI. Rittman Mead always recommend to use an external configuration management systems (ex: GitHub) in ODI releases earlier than 12.2.1.2, rather than the internal versioning. In the next ODI patchset, there will be integration with Git for better work flow control. Subversion integration is currently available as of ODI 12.1.3.

If you are interested in seeing how to use Git in conjunction with older versions of ODI (prior to 12.2.1.2.6) stay tuned and check out an upcoming video here using Rittman Mead Principal Consultant Pete Tamsin's method for 'Using Git in an ODI Procedure'.

Huge thanks to Pete Tamsin and Michael Rainey for their help editing this post. No man, or woman, is an island!

Find me on:

How to Use Versioning in ODI 12c
How to Use Versioning in ODI 12c



Related Videos:





Categories: BI & Warehousing

Catalog Validation: Why, What, When, Where and How?

Tue, 2016-11-22 02:04

One of the features everybody "loved" about OBIEE 11g were the Global Unique Identifiers (GUIDs), used to recognize users and groups based on an identifier that could be different from the username. The original aim of GUIDs was being able to distinguish different users sharing the same username coming from multiple Authentication Providers.

The GUIDs management could be tricky especially if they are not in sync between different environments, and could cause a wide range of errors like the inability to login or to see parts of the catalog.

[2016-10-20T09:19:04.000+02:00] [OBIPS] [ERROR:1] [] [saw.security.validate.indexes] [ecid: 0058cGJgGOkBh4sawh3j6G0001QC00000B,0] [tid: 2002437888] XXXX's guid 0A8AC9E0811D11E4AF4FE155B36CBFFD in catalog doesn't match guid 49BB3BB0629311E5BFFE71BB91F31C2B in backend! Aborting! Please UpdateGuids!

After checking the Presentation Services logs (sawlog.log), the solution for most of those errors was simply regenerating GUIDs. The GUIDs regeneration method however isn't something easily doable in a production system since it requires some downtime (a reboot of both the Oracle BI Server and Presentation Services is required).

Why Would you Run Catalog Validation?

You may ask yourself:

Why is he talking about GUIDs when they have been removed in OBIEE 12c?

And you would be perfectly correct. GUIDs misalignment is not a problem anymore in OBIEE 12c but was historically only one of the issues causing catalog corruption and that would require afterwards a catalog validation.

Even without GUIDs catalog corruption is still something that could happen in OBIEE 12c: objects (e.g. analysis, dashboards, agents) owned by deleted users, broken links, corrupted files in the server are only some of the issues that could be present in any OBIEE implementation no matter which version it's installed.
Most of the time corrupted catalogs generate errors which are difficult to diagnose and the manual fixing is not always possible and never easy to do.

The Catalog Validation process, available since OBIEE 11g, is very powerful since provides a detailed analysis - and an automated fix if configured - of all the catalog corruptions.

What is Catalog Validation?

As per Oracle's documentation, the Catalog Validation (CV) procedure does the following checks:

  • Ensures that each object in the catalog is larger than zero bytes: any object with zero bytes size is probably due to corruption and should be removed.
  • Ensures that each item in the catalog has a valid corresponding .atr file: the .atr file contains the properties (permissions, ownership, creation date, modification date etc.) of any object in the catalog. An object without related .atr file is not visible in OBIEE's front-end.
  • Ensures that each link in the catalog is valid: links to deleted or renamed dashboards and analysis will cause an error when clicked.
  • Ensures that the files in the account cache are valid: this step checks that all the accounts are valid and the cache entries (storing user related information) are up to date.
  • Ensures that all XML objects in the catalog pass schema validation: every object (dashboard, analysis, prompt etc.) in the catalog is stored as XML file. This step checks that the XML is valid.
  • Attempts to repair object names that were damaged by ftp programs: moving catalog objects using ftp programs could corrupt the object name.
When Should You Run Catalog Validation?

I've seen Catalog Validation being used only when problems were raised, however it is a good practice to validate the catalog every time a major change is made that impacts it or on a schedule in environments where end users can directly create content.

The following is a list of cases when running a Catalog Validation could be useful:

  • Before an upgrade: running CV before an upgrade and ensuring the consistency helps avoiding problems related to possible corruptions
  • After an upgrade: running CV after an upgrade to ensure that content and security migration worked
  • After a major change: when a major change happens in the catalog CV ensures to missing links or ownership problems are present
  • After a deployment: executing CV after a production deployment to check the content migration and verify the security.
  • On a schedule: execute CV on instances where end-users can create content and to verify accounts.

Please note that a catalog can have corruption even if no front-end enhancements have been made, the following are just some examples:

  • Developer account deletions: all objects owned by that account will be flagged as corrupted
  • Security changes: changing/deleting security roles impact catalog privileges
  • File System corruption: data can be badly written in file system
  • Content deletions: deleting content makes referring objects corrupted

Sometimes the OBIEE environment continues working as expected even if some of the above corruptions are present. Nevertheless on a long period those may be cause of errors especially if upgrades or changes in the security are planned.

Where Do You Run Catalog Validation?

Catalog Validation can be run in every OBIEE instance available, however the following use cases could be particularly interesting:

  • Validating development catalog: once consistency of development catalog is ensured it can then be migrated forward to production
  • Validating production (or smoke test) catalog: validating production catalog to ensure that code promotions happened consistently, that user homes are valid and that no objects (user created or promoted) are broken.

A particularity to note down is that if running CV with a production catalog in a different environment (e.g. development) with a different security store, then many accounts and their related content could be flagged as not-existent and deleted. As a general rule CV should be run on environments sharing the same security as where the catalog is sourced from, allowing a genuine check of the security settings.

Performing a Catalog Validation in production environment is not always possible due to the processes restarts required, a smoke test environment sharing the same security settings would be the perfect target for the test. When running Catalog Validation on a live catalog or when taking a catalog backup ensure that "Maintenance Mode" is activated: setting this flag ON (that can be found under Administration page in OBIEE's front-end) ensures that no changes can be performed in the catalog during the check or upgrade.

Maintenance Mode

How Do You Run Catalog Validation?

In order to run Catalog Validation you need to:

  • Stop Presentation Service[s] (obips): Stopping the component can be achieved either in Enterprise Manager or via command line. Command line syntax has changed between OBIEE 11g and 12c, you can find the two statements in below code
# 11g Syntax
$INSTANCE_HOME/bin/opmnctl stopproc ias-component=obips1
# 12c Syntax
$INSTANCE_HOME/bitools/bin/stop.sh -i OBIPS
  • Create a backup of the catalog: when performing a catalog backup 7-Zip should be the chosen tool. WinZip has know problems with catalog files (see Oracle's doc, chapter "17.10 Archiving and Unarchiving Using Catalog Manager").
  • Create a backup of instanceconfig.xml file (under $INSTANCE_HOME/config/fmwconfig/biconfig/OBIPS)
  • Change instanceconfig.xml file in order to include the validation tags explained in the following section
  • Start Presentation Service[s]: like the stop operation, this can be performed either via EM or command line. Below the code for 11g and 12c
# 11g Syntax
$INSTANCE_HOME/bin/opmnctl startproc ias-component=obips1
# 12c Syntax
$INSTANCE_HOME/bitools/bin/start.sh -i OBIPS
  • Repeat the steps above until the catalog is fully validated: As explained in section below, several different assessment and automated fixes can be performed. The sawlog.log files will contain entries when corrupted object are present in the catalog. A catalog is fully validated when no corrupted objects are found during CV.
  • Stop Presentation Service[s]
  • Restore original instanceconfig.xml file
  • Start Presentation Service[s]
Catalog Validation configuration

The following tags must be inserted under <ServerInstance><Catalog> tag.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>  
    <!-- Oracle Business Intelligence Presentation Services Configuration File -->
    <WebConfig xmlns="oracle.bi.presentation.services/config/v1.1">
    <ServerInstance>
        [...]
        <Catalog>
              <Validate>OnStartupAndExit</Validate>
              <ValidateAccounts>Clean</ValidateAccounts>
              <ValidateHomes>Report</ValidateHomes>
              <ValidateItems>Clean</ValidateItems>
              <ValidateLinks>Clean</ValidateLinks>
        </Catalog>
        [...]
    </ServerInstance>
</WebConfig>  

The tags do the following. See below for an explanation of the values that can be specified:

  • Validate: Main configuration tag. Possible values are
    • None: No Catalog Validation is going to happen, however all the privileges and each object ACLs are cleaned for non-existing accounts
    • OnStartupAndExit: Presentation Service is started, performs the validation based on the following tags and stops. This process can be reiterated multiple times with different options for each element.
  • ValidateAccounts: Verifies the consistency of users, roles and groups.
  • ValidateHomes: Verifies all user's homes, is executed only if ValidateAccounts is set to Report or Clean
  • ValidateItems: Verifies if catalog items are consistent - size greater than zero and valid xml.
  • ValidateLinks: Verifies the consistency of all links in the catalog (e.g. all analysis contained in a dashboard).

The accepted values for all settings except Validate are: are the following:

  • None: no validation will be performed
  • Report: a log is written for every inconsistent item in sawlog.log file under $INSTANCE_HOME/servers/obips1/logs
  • Clean: does the same step as Report plus removing from the catalog the inconsistent object.

As you understand the "Clean" option isn't suggested for all tags, you don't want a dashboard to be deleted only because the owner doesn't exist anymore, but it is the desired choice when you need to remove all the old or corrupted user homes. The "Report" option on the other side provides a way of logging all the corrupted items and fixing them manually.

Catalog Validation is an extremely useful tool, allowing an automated check (and fix) of all the corrupted items in the catalog. Using Catalog Validation together with Baseline Validation Tool provides a way of ensuring the correctness of migrations and developments:

  • Running Catalog Validation before the migration to ensure all objects to promote are consistent
  • Running Catalog Validation after the migration to ensure the consistency of all promoted objects and security
  • Running Baseline Validation Tool between source and target environment to ensure the expected outputs are matching.

Summarizing Catalog Validation and Baseline Validation Tool can be considered complementary: the first checks the catalog objects and security consistency, the second analyses and compares the expected results. Running both alongside any code promotion process should be considered a good practice.

Categories: BI & Warehousing

Pages