Feed aggregator

Data Processing and Enrichment in Spark Streaming with Python and Kafka

Rittman Mead Consulting - Fri, 2017-01-13 10:00

In my previous blog post I introduced Spark Streaming and how it can be used to process 'unbounded' datasets. The example I did was a very basic one - simple counts of inbound tweets and grouping by user. All very good for understanding the framework and not getting bogged down in detail, but ultimately not so useful.

We're going to stay with Twitter as our data source in this post, but we're going to consider a real-world requirement for processing Twitter data with low-latency. Spark Streaming will again be our processing engine, with future posts looking at other possibilities in this area.

Twitter has come a long way from its early days as a SMS-driven "microblogging" site. Nowadays it's used by millions of people to discuss technology, share food tips, and, of course, track the progress of tea-making. But it's also used for more nefarious purposes, including spam, and sharing of links to pirated material. The requirement we had for this proof of concept was to filter tweets for suspected copyright-infringing links in order that further action could be taken.

The environment I'm using is the same as before - Spark 2.0.2 running on Docker with Jupyter Notebooks to develop the code (and this article!). You can download the full notebook here.

The inbound tweets are coming from an Apache Kafka topic. Any matched tweets will be sent to another Kafka topic. The match criteria are:

  • Not a retweet
  • Contains at least one URL
  • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)
  • The Tweet text must match at least two from a predefined list of artists, albums, and tracks. This is necessary to avoid lots of false positives - think of how many music tracks there are out there, with names that are common in English usage ("yesterday" for example). So we must match at least two ("Yesterday" and "Beatles", or "Yesterday" and "Help!").
    • Match terms will take into account common misspellings (Little Mix -> Litle Mix), hashtags (Little Mix -> #LittleMix), etc

We'll also use a separate Kafka topic for audit/debug purposes to inspect any non-matched tweets.

As well as matching the tweet against the above conditions, we will enrich the tweet message body to store the identified artist/album/track to support subsequent downstream processing.

The final part of the requirement is to keep track of the number of inbound tweets, the number of matched vs unmatched, and for those matched, which artists they were for. These counts need to be per batch and over a window of time too.

Getting Started - Prototyping the Processing Code

Before we get into the meat of the streaming code, let's take a step back and look at what we're wanting the code to achieve. From the previous examples we know we can connect to a Kafka topic, pull in tweets, parse them for given fields, and do windowed counts. So far, so easy (or at least, already figured out!). Let's take a look at nub of the requirement here - the text matching.

If we peruse the BBC Radio 1 Charts we can see the popular albums and artists of the moment (Grant me a little nostalgia here; in my day people 'pirated' music from the Radio 1 chart show onto C90 cassettes, trying to get it without the DJ talking over the start and end. Nowadays it's done on a somewhat more technologically advanced basis). Currently it's "Little Mix" with the album "Glory Days". A quick Wikipedia or Amazon search gives us the track listing too:

  1. Shout Out to My Ex
  2. Touch
  3. F.U.
  4. Oops - Little Mix feat. Charlie Puth
  5. You Gotta Not
  6. Down & Dirty
  7. Power
  8. Your Love
  9. Nobody Like You
  10. No More Sad Songs
  11. Private Show
  12. Nothing Else Matters
  13. Beep Beep
  14. Freak
  15. Touch

A quick twitter search for the first track title gives us this tweet - I have no idea if it's legit or not, but it serves as an example for our matching code requirements:

DOWNLOAD MP3: Little Mix – Shout Out To My Ex (CDQ) Track https://t.co/C30c4Fel4u pic.twitter.com/wJjyG4cdjE

— Ngvibes Media (@ngvibes_com) November 3, 2016

Using the Twitter developer API I can retrieve the JSON for this tweet directly. I'm using the excellent Paw tool to do this.

From this we can get the text element:

"text": "DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE",

The obvious approach would be to have a list of match terms, something like:

match_text=("Little Mix","Glory Days","Shout Out to My Ex","Touch","F.U.")

But - we need to make sure we've matched two of the three types of metadata (artist/album/track), so we need to know which it is that we've matched in the text. We also need to handle variations in text for a given match (such as misspellings etc).

What I came up with was this:

filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})

def test_matching(test_string):  
    print 'Input: %s' % test_string
    for f in filters:
        for a in f['match']:
            if a.lower() in test_string.lower():
                print '\tTag: %s / Value: %s\n\t\t(Match string %s)' % (f['tag'],f['value'],a)

We could then take the test string from above and test it:

test_matching('DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Little Mix \u2013 Shout Out To My Ex (CDQ) Track https:\/\/t.co\/C30c4Fel4u https:\/\/t.co\/wJjyG4cdjE
    Tag: artist / Value: Little Mix
        (Match string Little Mix)
    Tag: track / Value: Shout Out To My Ex
        (Match string Shout Out To My Ex)

as well as making sure that variations in naming were also correctly picked up and tagged:

test_matching('DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE')  
Input: DOWNLOAD MP3: Litel Mixx #GloryDays https:\/\/t.co\/wJjyG4cdjE
    Tag: album / Value: Glory Days
        (Match string GloryDays)
    Tag: artist / Value: Little Mix
        (Match string Litel Mixx)
Additional Processing

With the text matching figured out, we also needed to address the other requirements:

  • Not a retweet
  • Contains at least one URL
    • URL(s) are not on a whitelist (for example, we're not interested in links to spotify.com, or back to twitter.com)
Not a Retweet

In the old days retweets were simply reposting the same tweet with a RT prefix; now it's done as part of the Twitter model and Twitter clients display the original tweet with the retweeter shown. In the background though, the JSON is different from an original tweet (i.e. not a retweet).

Original tweet:

Because we all know how "careful" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate

— George Takei (@GeorgeTakei) January 12, 2017
{
  "created_at": "Thu Jan 12 00:36:22 +0000 2017",
  "id": 819342218611728384,
  "id_str": "819342218611728384",
  "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

Retweet:

{
  "created_at": "Thu Jan 12 14:40:44 +0000 2017",
  "id": 819554713083461632,
  "id_str": "819554713083461632",
  "text": "RT @GeorgeTakei: Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",

[...]

  "retweeted_status": {
    "created_at": "Thu Jan 12 00:36:22 +0000 2017",
    "id": 819342218611728384,
    "id_str": "819342218611728384",
    "text": "Because we all know how \"careful\" Trump is about not being recorded when he's not aware of it. #BillyBush #GoldenGate",
[...]

So retweets have an additional set of elements in the JSON body, under the retweeted_status element. We can pick this out using the get method as seen in this code snippet, where tweet is a Python object created from a json.loads from the JSON of the tweet:

if tweet.get('retweeted_status'):  
    print 'Tweet is a retweet'
else:  
    print 'Tweet is original'
Contains a URL, and URL is not on Whitelist

Twitter are very good to us in the JSON they supply for each tweet. Every possible attribute of the tweet is encoded as elements in the JSON, meaning that we don't have to do any nasty parsing of the tweet text itself. To find out if there are URLs in the tweet, we just check the entities.urls element, and iterate through the array if present.

if not tweet.get('entities'):  
    print 'no entities element'
else:  
    if not tweet.get('entities').get('urls'):
        print 'no entities.urls element'

The URL itself is again provided to us by Twitter as the expanded_url within the urls array, and using the urlsplit library as I did previously we can extract the domain:

for url in tweet['entities']['urls']:

    expanded_url = url['expanded_url']
    domain = urlsplit(expanded_url).netloc

With the domain extracted, we can then compare it to a predefined whitelist so that we don't pick up tweets that are just linking back to sites such as Spotify, iTunes, etc. Here I'm using the Python set type and issubset method to compare the list of domain(s) that I've extracted from the tweet into the url_info list, against the whitelist:

if set(url_info['domain']).issubset(domain_whitelist):  
    print 'All domains whitelisted'
The Stream Processing Bit

With me so far? We've looked at the requirements for what our stream processing needs to do, and worked out the prototype code that will do this. Now we can jump into the actual streaming code. You can see the actual notebook here if you want to try this yourself.

Job control variables
batchIntervalSec=30  
windowIntervalSec=21600 # Six hours  
app_name = 'spark_twitter_enrich_and_count_rm_01e'  
Make Kafka available to Jupyter
import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python'  
Import dependencies

As well as Spark libraries, we're also bringing in the KafkaProducer library which will enable us to send messages to Kafka. This is in the kafka-python package. You can install this standalone on your system, or inline as done below.

# Necessary to make Kafka library available to pyspark
os.system("pip install kafka-python")

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
#    Kafka
from kafka import SimpleProducer, KafkaClient  
from kafka import KafkaProducer

#    json parsing
import json  
#    url deconstruction
from urlparse import urlsplit  
#    regex domain handling
import re  
Define values to match
filters=[]  
filters.append({"tag":"album","value": "Glory Days","match":["Glory Days","GloryDays"]})  
filters.append({"tag":"artist","value": "Little Mix","match":["Little Mix","LittleMix","Litel Mixx"]})  
filters.append({"tag":"track","value": "Shout Out To My Ex","match":["Shout Out To My Ex","Shout Out To My X","ShoutOutToMyEx","ShoutOutToMyX"]})  
filters.append({"tag":"track","value": "F.U.","match":["F.U","F.U.","FU","F U"]})  
filters.append({"tag":"track","value": "Touch","match":["Touch"]})  
filters.append({"tag":"track","value": "Oops","match":["Oops"]})  
Define whitelisted domains
domain_whitelist=[]  
domain_whitelist.append("itun.es")  
domain_whitelist.append("wikipedia.org")  
domain_whitelist.append("twitter.com")  
domain_whitelist.append("instagram.com")  
domain_whitelist.append("medium.com")  
domain_whitelist.append("spotify.com")  
Function: Unshorten shortened URLs (bit.ly etc)
# Source: http://stackoverflow.com/a/4201180/350613
import httplib  
import urlparse

def unshorten_url(url):  
    parsed = urlparse.urlparse(url)
    h = httplib.HTTPConnection(parsed.netloc)
    h.request('HEAD', parsed.path)
    response = h.getresponse()
    if response.status/100 == 3 and response.getheader('Location'):
        return response.getheader('Location')
    else:
        return url
Function: Send messages to Kafka

To inspect the Kafka topics as messages are sent use:

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 --topic twitter_matched2  

N.B. following the Design Patterns for using foreachRDD guide here.

# http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

def send_to_kafka_matched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_matched2', str(json.dumps(record)))

def send_to_kafka_notmatched(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_notmatched2', str(record))

def send_to_kafka_err(partition):  
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer

    kafka_prod = KafkaProducer(bootstrap_servers='cdh57-01-node-01.moffatt.me:9092')
    for record in partition:
        kafka_prod.send('twitter_err2', str(record))
Function: Process each tweet

This is the main processing code. It implements all of the logic described in the requirements above. If a processing condition is not met, the function returns a negative code and description of the condition that was not met. Errors are also caught and returned.

You can see a syntax-highlighted version of the code in the notebook here.

def process_tweet(tweet):  
    # Check that there's a URLs in the tweet before going any further
    if tweet.get('retweeted_status'):
        return (-1,'retweet - ignored',tweet)

    if not tweet.get('entities'):
        return (-2,'no entities element',tweet)

    if not tweet.get('entities').get('urls'):
        return (-3,'no entities.urls element',tweet)

    # Collect all the domains linked to in the tweet
    url_info={}
    url_info['domain']=[]
    url_info['primary_domain']=[]
    url_info['full_url']=[]
    try:
        for url in tweet['entities']['urls']:
            try:
                expanded_url = url['expanded_url']
            except Exception, err:
                return (-104,err,tweet)

            # Try to resolve the URL (assumes it's shortened - bit.ly etc)
            try:
                expanded_url = unshorten_url(expanded_url)
            except Exception, err:
                return (-108,err,tweet)

            # Determine the domain
            try:
                domain = urlsplit(expanded_url).netloc
            except Exception, err:
                return (-107,err,tweet)
            try:
                # Extract the 'primary' domain, e.g. www36.foobar.com -> foobar.com
                #
                # This logic is dodgy for UK domains (foo.co.uk, foo.org.uk, etc) 
                # since it truncates to the last two parts of the domain only (co.uk)
                #
                re_result = re.search('(\w+\.\w+)$',domain)
                if re_result:
                    primary_domain = re_result.group(0)
                else:
                    primary_domain = domain
            except Exception, err:
                return (-105,err,tweet)

            try:
                url_info['domain'].append(domain)
                url_info['primary_domain'].append(primary_domain)
                url_info['full_url'].append(expanded_url)
            except Exception, err:
                return (-106,err,tweet)


        # Check domains against the whitelist
        # If every domain found is in the whitelist, we can ignore them
        try:
            if set(url_info['primary_domain']).issubset(domain_whitelist):
                return (-8,'All domains whitelisted',tweet)
        except Exception, err:
            return (-103,err,tweet)

        # Check domains against the blacklist
        # Unless a domain is found, we ignore it
        #Only use this if you have first defined the blacklist!
        #if not set(domain_blacklist).intersection(url_info['primary_domain']):
        #    return (-9,'No blacklisted domains found',tweet)


    except Exception, err:
        return (-102,err,tweet)

    # Parse the tweet text against list of trigger terms
    # --------------------
    # This is rather messy iterative code that maybe can be optimised
    #
    # Because match terms are not just words, it's not enough to break
    #  up the tweet text into words and match against the filter list.
    #  Instead we have to take each filter term and see if it exists
    #  within the tweet text as a whole
    #
    # Using a set instead of list so that duplicates aren't added
    #
    matched=set()
    try:
        for f in filters:
            for a in f['match']:
                tweet_text = tweet['text']
                match_text = a.decode('utf-8')
                if match_text in tweet_text:
                    matched.add((f['tag'],f['value']))
    except Exception, err:
        return (-101,err,tweet)

    #-----
    # Add the discovered metadata into the tweet object that this function will return
    try:
        tweet['enriched']={}
        tweet['enriched']['media_details']={}
        tweet['enriched']['url_details']=url_info
        tweet['enriched']['match_count']=len(matched)
        for match in matched:
            tweet['enriched']['media_details'][match[0]]=match[1]

    except Exception, err:
        return (-100,err,tweet)

    return (len(matched),tweet)
Function: Streaming context definition

This is the function that defines the streaming context. It needs to be a function because we're using windowing and so the streaming context needs to be configured to checkpoint.

As well as processing inbound tweets, it performs counts of:

  • Inbound
  • Outbound, by type (match/no match/error)
  • For matched tweets, top domains and artists

The code is commented inline to explain how it works. You can see a syntax-highlighted version of the code in the notebook here.

def createContext():  
    sc = SparkContext(appName="spark_twitter_enrich_and_count_01")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, batchIntervalSec)

    # Define Kafka Consumer and Producer
    kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', app_name, {'twitter':1})

    ## Get the JSON tweets payload
    ## >>TODO<<  This is very brittle - if the Kafka message retrieved is not valid JSON the whole thing falls over
    tweets_dstream = kafkaStream.map(lambda v: json.loads(v[1]))

    ## -- Inbound Tweet counts
    inbound_batch_cnt = tweets_dstream.count()
    inbound_window_cnt = tweets_dstream.countByWindow(windowIntervalSec,batchIntervalSec)

    ## -- Process
    ## Match tweet to trigger criteria
    processed_tweets = tweets_dstream.\
        map(lambda tweet:process_tweet(tweet))

    ## Send the matched data to Kafka topic
    ## Only treat it as a match if we hit at least two of the three possible matches (artist/track/album)
    ## 
    ## _The first element of the returned object is a count of the number of matches, or a negative 
    ##  to indicate an error or no URL content in the tweet._
    ## 
    ## _We only want to send the actual JSON as the output, so use a `map` to get just this element_

    matched_tweets = processed_tweets.\
                        filter(lambda processed_tweet:processed_tweet[0]>1).\
                        map(lambda processed_tweet:processed_tweet[1])

    matched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_matched))
    matched_batch_cnt = matched_tweets.count()
    matched_window_cnt = matched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Set up counts for matched metadata
    ##-- Artists
    matched_artists = matched_tweets.map(lambda tweet:(tweet['enriched']['media_details']['artist']))

    matched_artists_batch_cnt = matched_artists.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Artist: %s\tCount: %s" % (x[0],x[1]))

    matched_artists_window_cnt = matched_artists.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Artist: %s\tCount: %s" % (x[0],x[1]))

    ##-- Domains
    ## Since url_details.primary_domain is an array, need to flatMap here
    matched_domains = matched_tweets.flatMap(lambda tweet:(tweet['enriched']['url_details']['primary_domain']))

    matched_domains_batch_cnt = matched_domains.countByValue()\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Batch/Domain: %s\tCount: %s" % (x[0],x[1]))
    matched_domains_window_cnt = matched_domains.countByValueAndWindow(windowIntervalSec,batchIntervalSec)\
                                    .transform((lambda foo:foo.sortBy(lambda x:-x[1])))\
                                    .map(lambda x:"Window/Domain: %s\tCount: %s" % (x[0],x[1]))

    ## Display non-matches for inspection
    ## 
    ## Codes less than zero but greater than -100 indicate a non-match (e.g. whitelist hit), but not an error

    nonmatched_tweets = processed_tweets.\
        filter(lambda processed_tweet:(-99<=processed_tweet[0]<=1))

    nonmatched_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_notmatched))

    nonmatched_batch_cnt = nonmatched_tweets.count()
    nonmatched_window_cnt = nonmatched_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ##  Print any erroring tweets
    ## 
    ## Codes less than -100 indicate an error (try...except caught)

    errored_tweets = processed_tweets.\
                        filter(lambda processed_tweet:(processed_tweet[0]<=-100))
    errored_tweets.foreachRDD(lambda rdd: rdd.foreachPartition(send_to_kafka_err))

    errored_batch_cnt = errored_tweets.count()
    errored_window_cnt = errored_tweets.countByWindow(windowIntervalSec,batchIntervalSec)

    ## Print counts
    inbound_batch_cnt.map(lambda x:('Batch/Inbound: %s' % x))\
        .union(matched_batch_cnt.map(lambda x:('Batch/Matched: %s' % x))\
            .union(nonmatched_batch_cnt.map(lambda x:('Batch/Non-Matched: %s' % x))\
                .union(errored_batch_cnt.map(lambda x:('Batch/Errored: %s' % x)))))\
        .pprint()

    inbound_window_cnt.map(lambda x:('Window/Inbound: %s' % x))\
        .union(matched_window_cnt.map(lambda x:('Window/Matched: %s' % x))\
            .union(nonmatched_window_cnt.map(lambda x:('Window/Non-Matched: %s' % x))\
                .union(errored_window_cnt.map(lambda x:('Window/Errored: %s' % x)))))\
        .pprint()

    matched_artists_batch_cnt.pprint()
    matched_artists_window_cnt.pprint()
    matched_domains_batch_cnt.pprint()
    matched_domains_window_cnt.pprint()

    return ssc
Start the streaming context
ssc = StreamingContext.getOrCreate('/tmp/%s' % app_name,lambda: createContext())  
ssc.start()  
ssc.awaitTermination()  
Stream Output Counters

From the stdout of the job we can see the simple counts of inbound and output splits, both per batch and accumulating window:

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Batch/Inbound: 9
Batch/Matched: 0
Batch/Non-Matched: 9
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:50:30
-------------------------------------------
Window/Inbound: 9
Window/Non-Matched: 9

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Batch/Inbound: 21
Batch/Matched: 0
Batch/Non-Matched: 21
Batch/Errored: 0

-------------------------------------------
Time: 2017-01-13 11:51:00
-------------------------------------------
Window/Inbound: 30
Window/Non-Matched: 30

The details of identified artists within tweets is also tracked, per batch and accumulated over the window period (6 hours, in this example)

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Batch/Artist: Major Lazer       Count: 4
Batch/Artist: David Bowie       Count: 1

-------------------------------------------
Time: 2017-01-12 12:45:30
-------------------------------------------
Window/Artist: Major Lazer      Count: 1320
Window/Artist: Drake    Count: 379
Window/Artist: Taylor Swift     Count: 160
Window/Artist: Metallica        Count: 94
Window/Artist: David Bowie      Count: 84
Window/Artist: Lady Gaga        Count: 37
Window/Artist: Pink Floyd       Count: 11
Window/Artist: Kate Bush        Count: 10
Window/Artist: Justice  Count: 9
Window/Artist: The Weeknd       Count: 8
Kafka Output Matched

The matched Kafka topic holds a stream of tweets in JSON format, with the discovered metadata (artist/album/track) added. I'm using the Kafka console consumer to view the contents, parsed through jq to show just the tweet text and metadata that has been added.

kafka-console-consumer --zookeeper cdh57-01-node-01.moffatt.me:2181 \  
--topic twitter_matched1 \
--from-beginning|jq -r "[.text,.enriched.url_details.primary_domain[0],.enriched.media_details.artist,.enriched.media_details.album,.enriched.media_details.track,.enriched.match_count] "
[
  "Million Reasons by Lady Gaga - this is horrendous sorry @ladygaga https://t.co/rEtePIy3OT",
  "youtube.com",
  "https://www.youtube.com/watch?v=NvMoctjjdhA&feature=youtu.be",
  "Lady Gaga",
  null,
  "Million Reasons",
  2
]
Non-Matched

On our Kafka topics outbound we can see the non-matched messages. Probably you'd disable this stream once the processing logic was finalised, but it's useful to be able to audit and validate the reasons for non-matches. Here a retweet is ignored, and we can see it's a retweet from the RT prefix of the text field. The -1 is the return code from the process_tweets function denoting a non-match:

(-1, 'retweet - ignored', {u'contributors': None, u'truncated': False, u'text': u'RT @ChartLittleMix: Little Mix adicionou datas para a Summer Shout Out 2017 no Reino Unido https://t.co/G4H6hPwkFm', u'@timestamp': u'2017-01-13T11:45:41.000Z', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 819873048132288513, u'favorite_count': 0, u'source':
Summary

In this article we've built on the foundations of the initial exploration of Spark Streaming on Python, expanding it out to address a real-world processing requirement. Processing unbounded streams of data this way is not as complex as you may think, particularly for the benefits that it can yield in reducing the latencies between an event occuring and taking action from it.

We've not touched on some of the more complex areas, such as scaling this up to multiple Spark nodes, partitioned Kafka topics, and so on - that's another [kafka] topic (sorry...) for another day. The code itself is also rudimentary - before moving it into Production there'd be some serious refactoring and optimisation review to be performed on it.

You can find the notebook for this article here, and the previous article's here.

If you'd like more information on how Rittman Mead can help your business get the most of out its data, please do get in touch!

Categories: BI & Warehousing

Oracle Sort Order Issue

Tom Kyte - Fri, 2017-01-13 09:46
Hi Tom, According to Oracle document, the sort order depends on database option NLS_SORT, and there are only 2 kinds of sort: Binary Sort and Linguistic Sort. Is there any other options to change the sort order? Currently, the following sql wi...
Categories: DBA Blogs

Converting mixed Latin + Japanese text to Shift-JIS

Tom Kyte - Fri, 2017-01-13 09:46
Hi Tom, I have a string of mixed Latin and Japanese characters that must be converted to Shift-JIS when output to a particular report (not always). The DB character set is AL32UTF8. When I execute this statement: <code> convert('H. pylori?????...
Categories: DBA Blogs

Insert with select subquery hangs for a long time

Tom Kyte - Fri, 2017-01-13 09:46
<b>Oracle database version:</b> Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production PL/SQL Release 11.2.0.4.0 - Production "CORE 11.2.0.4.0 Production" TNS for 64-bit Windows: Version 11.2.0.4.0 - Production NLSRTL Versi...
Categories: DBA Blogs

Calculating last 5 business working days

Tom Kyte - Fri, 2017-01-13 09:46
Hi Tom, I want to get last 5 business days. If its friday today, I need from passed monday to this friday and if its monday today...last tuesday to this monday is required. Saturday and sunday need to skipped in calculating last 5 business days. ...
Categories: DBA Blogs

Frename function not working consistently in PL/SQL

Tom Kyte - Fri, 2017-01-13 09:46
Hi, I have a PL/SQL proc to rename files under directory. I am using the below function to rename the files :- UTL_FILE.FRENAME('ATTDIR', FILE_NAME2, 'ATTDIR', V_ID1, TRUE); <b>The files are existing on the directory</b> even though the P...
Categories: DBA Blogs

Imported Wrong data

Tom Kyte - Fri, 2017-01-13 09:46
Hi Team, Today by mistake I imported Wrong data from production to test server Db. I want to restore test tables to their original state .I don't have backup of test server data.I tried flash but it is not helpful here. What should I do to get ...
Categories: DBA Blogs

Is it possible to install 11G and 12C in Windows 7 64 bit.

Tom Kyte - Fri, 2017-01-13 09:46
Hi, Can you help me to resolve the issue. Scenario 1: I have 11G 64 bits running fine in Windows 64 bits but When I was trying to install 12C 64 bits in same system, Its throwing an error while creating database through DBCA. Error is: Error e...
Categories: DBA Blogs

Performance - Number vs. VARCHAR2 datatypes

Tom Kyte - Fri, 2017-01-13 09:46
Hello, I am designing a system that will store and query billions of IP addresses, both IPv4 and IPv6. I was wondering - (a) Will storing the IP as an integer (e.g Number datatype) instead of a string (e.g. VARCAHR2) give better performance in ...
Categories: DBA Blogs

Multiple DB triggers on a table for a given ledger or OU or Inventory Org

Tom Kyte - Fri, 2017-01-13 09:46
Hi, We have a requirement where the plan is to track any inserts/updates/deletes on some EBS tables. A company has operations across countries & continents. Can database triggers be created on the same table to track changes based on the Ledger (S...
Categories: DBA Blogs

Lost all controlfiles and not having database tablespace and datafiles information to create a new control file

Tom Kyte - Fri, 2017-01-13 09:46
Hello Tom/Maria/Team We ran a bad task in a cron in linux which remove all files modified 1 day ago, the database was up&running but idle=>without use bc it is test db, then the task removed control files and spfile bc it was down but I started it...
Categories: DBA Blogs

OPN Oracle Cloud Subscription, Pricing and Metering

Exclusive Live Webcast ...

We share our skills to maximize your revenue!
Categories: DBA Blogs

dbi Tail ranked as the 4th best alternative !

Yann Neuhaus - Fri, 2017-01-13 08:19

Dear colleagues,

Since the publication of the dbi Tail one year ago, lots of people downloaded, and some reporters wrote articles about it.

We have got many positive feedbacks, and we would like to thank you all for your attention !

 

The best article is from Softpedia website, and the author made an interesting description of the tool as you can see:

http://www.softpedia.com/get/System/File-Management/dbi-Tail.shtml

 

More interesting, in June, the tool was published alongside of the reference on the “Alternative to” website. Since then, the tail from dbi services was moving higher and higher quite without interruption.

At one point in time, it was ranked as the 3rd best alternative of the reference. Today, it is in the 4th position. What a good feeling !

 

During one year the development of the tail continued, and new features were integrated. The biggest one was to allow SSH connection using a public key authentication, enabling the possibility of connecting without any password. In fact just by using a trusted user public key.

The other one is the ability to quickly navigate between the several “tree” files present in your “etc” folder. In fact this will enhance the user experience by grouping the monitor log files for one context, and the switching to another context (another “tree” file) just in one click.

 

In the freshly 1.3 version of dbi tail, you can also benefit from some bug fixes and enhancements especially for the Linux environment.

Enjoy continuing to use dbi tail, and do not hesitate to provide your feedback or to like it as well:

http://alternativeto.net/software/baretail/

 

dbi tail is an alive open source project, and will continue to be in the future !

https://github.com/pschweitz/DBITail/releases

 

Cheers,

Philippe

 

tail1.3

 

 

Cet article dbi Tail ranked as the 4th best alternative ! est apparu en premier sur Blog dbi services.

Oracle TO_MULTI_BYTE Function with Examples

Complete IT Professional - Fri, 2017-01-13 05:00
In this article, I’ll be covering the Oracle TO_MULTI_BYTE function, and look at some examples. Purpose of the Oracle TO_MULTI_BYTE Function The TO_MULTI_BYTE function is used to convert a character string from single-byte characters to multi-byte characters. Your database must contain both single-byte and multi-byte characters for this function to be useful. If there are […]
Categories: Development

use_nl hint

Jonathan Lewis - Fri, 2017-01-13 02:52

In response to a recent lamentation from Richard Foote about the degree of ignorance regarding the clustering_factor of indexes I commented on the similar level of understanding of a specific hint syntax, namely use_nl(a b) pointing out that this does not mean “do a nested loop from a to b”. My comment was underscored by a fairly prompt response asking what the hint did mean.

Surprisingly, although I’ve explained it many times over the last couple of decades (here’s one from 10 years ago), I couldn’t find an explanation on my blog though I did find a blog note where I’d made a passing comment about the equivalent misunderstanding of the use_hash(a b) syntax.

The misunderstanding is not entirely surprising since for many years the Oracle manuals seemed to suggest (in their examples) that the hint did have a multi-table meaning and it wasn’t until 10g that the manual gave an explicit statement of the single-table nature of the hint. The hint /*+ use_nl(a b) */ is a short-hand for the pair of hints /*+ use_nl(a)  use_nl(b) */ it doesn’t say anything about whether a and b should be joined, or in what order. If you want to guarantee that a and b will be joined in that order by a nested loop you will have to work a lot harder with your hints – and almost certainly need to make use of the /+ leading() */ hint.

Consider the following query (I’ll put the table creation code at the end of the article if you want to experiment):

select
	/*+ use_nl(a b) */
	a.v1, b.v1, c.v1, d.v1
from
	a, b, c, d
where
	d.n100 = 0
and	a.n100 = d.id
and	b.n100= a.n2
and	c.id = a.id
;

Only one of the tables a and b can be the first table in the final execution plan so one of them will be “the next table in the join order” at some point, so this hint will guarantee that one of the tables will be the inner table of a nested loop join. Here’s the plan I happened to get with my data, indexing, version (11.2.0.4), etc.:

---------------------------------------------------------------------------------------
| Id  | Operation                      | Name | Rows  | Bytes | Cost (%CPU)| Time     |
---------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT               |      | 20000 |  1347K| 30125   (1)| 00:00:02 |
|   1 |  HASH JOIN                     |      | 20000 |  1347K| 30125   (1)| 00:00:02 |
|   2 |   TABLE ACCESS FULL            | C    | 10000 |   146K|    26   (4)| 00:00:01 |
|   3 |   HASH JOIN                    |      | 20000 |  1054K| 30098   (1)| 00:00:02 |
|   4 |    TABLE ACCESS FULL           | D    |   100 |  1800 |    26   (4)| 00:00:01 |
|   5 |    NESTED LOOPS                |      | 20000 |   703K| 30072   (1)| 00:00:02 |
|   6 |     NESTED LOOPS               |      | 20000 |   703K| 30072   (1)| 00:00:02 |
|   7 |      TABLE ACCESS FULL         | B    | 10000 |   136K|    26   (4)| 00:00:01 |
|   8 |      INDEX RANGE SCAN          | A_I2 |     2 |       |     1   (0)| 00:00:01 |
|   9 |     TABLE ACCESS BY INDEX ROWID| A    |     2 |    44 |     3   (0)| 00:00:01 |
---------------------------------------------------------------------------------------

In this case it’s table a that ends up in a position to be the inner table of a nested loop join.

You may be wondering why there seems to be a hash join into b when we’ve hinted a nested loop join – but the join order that Oracle is using is B -> A -> D -> C with a swap_join_inputs(d) swap_join_inputs(d), so b is never “the next table in the join order”.

If you want an even more confusing (at first sight) plan here’s the plan I got if I changed the one hint to /*+ use_nl(a) */


-----------------------------------------------------------------------------
| Id  | Operation            | Name | Rows  | Bytes | Cost (%CPU)| Time     |
-----------------------------------------------------------------------------
|   0 | SELECT STATEMENT     |      | 20000 |  1347K|   105   (5)| 00:00:01 |
|   1 |  HASH JOIN           |      | 20000 |  1347K|   105   (5)| 00:00:01 |
|   2 |   TABLE ACCESS FULL  | B    | 10000 |   136K|    26   (4)| 00:00:01 |
|   3 |   HASH JOIN          |      | 10000 |   537K|    78   (4)| 00:00:01 |
|   4 |    TABLE ACCESS FULL | C    | 10000 |   146K|    26   (4)| 00:00:01 |
|   5 |    HASH JOIN         |      | 10000 |   390K|    52   (4)| 00:00:01 |
|   6 |     TABLE ACCESS FULL| D    |   100 |  1800 |    26   (4)| 00:00:01 |
|   7 |     TABLE ACCESS FULL| A    | 10000 |   214K|    26   (4)| 00:00:01 |
-----------------------------------------------------------------------------

This plan really looks as if Oracle should have done a nested loop into a but didn’t. Again appearanced are deceptive thanks to the effects of swap_join_inputs(): the join order here is A -> D -> C -> B (note that we don’t have a use_nl(b) hint in this example).

If you want a plan where the optimizer produces a nested loop join between a and b you’ll need to put in a leading() hint which places b immediately after a somewhere in the list of tables with just use_nl(b) being sufficient to enforce the join method. Here, for example, is the plan with hints /*+ leading(d a b c) use_nl(b) */ for my data set:


----------------------------------------------------------------------------------------
| Id  | Operation                     | Name   | Rows  | Bytes | Cost (%CPU)| Time     |
----------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |        | 20000 |  1347K| 30164   (1)| 00:00:02 |
|   1 |  HASH JOIN                    |        | 20000 |  1347K| 30164   (1)| 00:00:02 |
|   2 |   TABLE ACCESS FULL           | C      | 10000 |   146K|    26   (4)| 00:00:01 |
|   3 |   NESTED LOOPS                |        | 20000 |  1054K| 30137   (1)| 00:00:02 |
|   4 |    NESTED LOOPS               |        |  1000K|  1054K| 30137   (1)| 00:00:02 |
|   5 |     HASH JOIN                 |        | 10000 |   390K|    52   (4)| 00:00:01 |
|   6 |      TABLE ACCESS FULL        | D      |   100 |  1800 |    26   (4)| 00:00:01 |
|   7 |      TABLE ACCESS FULL        | A      | 10000 |   214K|    26   (4)| 00:00:01 |
|   8 |     INDEX RANGE SCAN          | B_I100 |   100 |       |     1   (0)| 00:00:01 |
|   9 |    TABLE ACCESS BY INDEX ROWID| B      |     2 |    28 |   101   (0)| 00:00:01 |
----------------------------------------------------------------------------------------

Notice, yet again, Oracle has done hash join to c with a swap_join_inputs().

Creation Script:

create table a
nologging
as
with generator as (
        select 
                rownum id
        from dual 
        connect by 
                level <= 1e4
)
select
	rownum				id,
	mod(rownum,5000)		n2,
	mod(rownum,100)			n100,
	lpad(rownum,10,'0')		v1,
	lpad('x',100,'x')		padding
from
        generator       v1
;

create table b nologging as select * from a;
create table c nologging as select * from a;
create table d nologging as select * from a;

alter table a add constraint a_pk primary key(id);
alter table b add constraint b_pk primary key(id);
alter table c add constraint c_pk primary key(id);
alter table d add constraint d_pk primary key(id);

create index a_i2 on a(n2) nologging;
create index b_i2 on b(n2) nologging;
create index c_i2 on c(n2) nologging;
create index d_i2 on d(n2) nologging;

create index a_i100 on a(n100) nologging;
create index b_i100 on b(n100) nologging;
create index c_i100 on c(n100) nologging;
create index d_i100 on d(n100) nologging;
begin
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'A',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'B',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'C',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'D',
		method_opt	 => 'for all columns size 1'
	);
end;
/

Will EBS Work with Windows 10 Creators Update?

Steven Chan - Fri, 2017-01-13 02:05

Windows 10 logoMicrosoft is releasing a major update to Windows 10 called the Windows 10 Creators Update:

This new update includes a number of features specifically aimed at enterprise and business users.  These include:

  • Windows Security Center
  • Windows Defender Advanced Threat Protection
  • In-place UEFI conversion

Differential downloads for Windows 10

Microsoft is also changing their packaging of major Windows 10 updates from full operating system downloads to smaller, incremental downloads.  This new delivery method is called the Unified Update Platform (UUP).

The new UUP delivery method will provide only the Windows 10 patches that have been released since the last time a desktop was updated.  

Will this work with EBS?

Yes.  Windows 10 is certified with all current EBS releases, including EBS 12.1 and 12.2. This existing certification is expected to apply to the upcoming Windows 10 Creators Update release.

The Unified Update Platform is not expected to require an additional certification.  Microsoft emphasizes that nothing changes with Windows 10 itself -- this only modifies the way that Win10 patches are delivered:

It’s important to note that with UUP, nothing will look or behave differently on the surface, UUP is all underlying platform and service optimization that happens behind the scenes.

Related Articles

The preceding is intended to outline our general product direction.  It is intended for information purposes only, and may not be incorporated into any contract.   It is not a commitment to deliver any material, code, or functionality, and should not be relied upon in making purchasing decision.  The development, release, and timing of any features or functionality described for Oracle’s products remains at the sole discretion of Oracle.

Categories: APPS Blogs

Oracle SQL Repeated words in the String

Tom Kyte - Thu, 2017-01-12 15:26
I need your suggestions/inputs on of the following task. I have the following table ID ID_NAME 1 TOM HANKS TOM JR 2 PETER PATROL PETER JOHN PETER 3 SAM LIVING 4 JOHNSON & JOHNSON INC 5 DUHGT LLC 6 THE POST OF THE OFFICE 7 ...
Categories: DBA Blogs

NO_DATA_FOUND in Functions

Tom Kyte - Thu, 2017-01-12 15:26
Tom, We've just migrated from Oracle 9.0.1 to 9.2 and, coincidence or not, I'm facing a problem I had never faced before. The NO_DATA_FOUND exception is not being raised from my PL/SQL functions anymore!!! If I put a exception block to handle t...
Categories: DBA Blogs

NEXT vs UNIFORM SIZE

Tom Kyte - Thu, 2017-01-12 15:26
Hi All, Please clarify below doubt, If I use NEXT 10M and UNIFIOM SIZE 1M, which size will my next extent will have 1M or 10M? if we use this command: create TABLESPACE ts_cvliste11_20180815 datafile '/ora1/app/oracle/oradata/L11/2018/ts_c...
Categories: DBA Blogs

Pages

Subscribe to Oracle FAQ aggregator