Feed aggregator

dbi Tail ranked as the 4th best alternative !

Yann Neuhaus - Fri, 2017-01-13 08:19

Dear colleagues,

Since the publication of the dbi Tail one year ago, lots of people downloaded, and some reporters wrote articles about it.

We have got many positive feedbacks, and we would like to thank you all for your attention !

 

The best article is from Softpedia website, and the author made an interesting description of the tool as you can see:

http://www.softpedia.com/get/System/File-Management/dbi-Tail.shtml

 

More interesting, in June, the tool was published alongside of the reference on the “Alternative to” website. Since then, the tail from dbi services was moving higher and higher quite without interruption.

At one point in time, it was ranked as the 3rd best alternative of the reference. Today, it is in the 4th position. What a good feeling !

 

During one year the development of the tail continued, and new features were integrated. The biggest one was to allow SSH connection using a public key authentication, enabling the possibility of connecting without any password. In fact just by using a trusted user public key.

The other one is the ability to quickly navigate between the several “tree” files present in your “etc” folder. In fact this will enhance the user experience by grouping the monitor log files for one context, and the switching to another context (another “tree” file) just in one click.

 

In the freshly 1.3 version of dbi tail, you can also benefit from some bug fixes and enhancements especially for the Linux environment.

Enjoy continuing to use dbi tail, and do not hesitate to provide your feedback or to like it as well:

http://alternativeto.net/software/baretail/

 

dbi tail is an alive open source project, and will continue to be in the future !

https://github.com/pschweitz/DBITail/releases

 

Cheers,

Philippe

 

tail1.3

 

 

Cet article dbi Tail ranked as the 4th best alternative ! est apparu en premier sur Blog dbi services.

Oracle TO_MULTI_BYTE Function with Examples

Complete IT Professional - Fri, 2017-01-13 05:00
In this article, I’ll be covering the Oracle TO_MULTI_BYTE function, and look at some examples. Purpose of the Oracle TO_MULTI_BYTE Function The TO_MULTI_BYTE function is used to convert a character string from single-byte characters to multi-byte characters. Your database must contain both single-byte and multi-byte characters for this function to be useful. If there are […]
Categories: Development

use_nl hint

Jonathan Lewis - Fri, 2017-01-13 02:52

In response to a recent lamentation from Richard Foote about the degree of ignorance regarding the clustering_factor of indexes I commented on the similar level of understanding of a specific hint syntax, namely use_nl(a b) pointing out that this does not mean “do a nested loop from a to b”. My comment was underscored by a fairly prompt response asking what the hint did mean.

Surprisingly, although I’ve explained it many times over the last couple of decades (here’s one from 10 years ago), I couldn’t find an explanation on my blog though I did find a blog note where I’d made a passing comment about the equivalent misunderstanding of the use_hash(a b) syntax.

The misunderstanding is not entirely surprising since for many years the Oracle manuals seemed to suggest (in their examples) that the hint did have a multi-table meaning and it wasn’t until 10g that the manual gave an explicit statement of the single-table nature of the hint. The hint /*+ use_nl(a b) */ is a short-hand for the pair of hints /*+ use_nl(a)  use_nl(b) */ it doesn’t say anything about whether a and b should be joined, or in what order. If you want to guarantee that a and b will be joined in that order by a nested loop you will have to work a lot harder with your hints – and almost certainly need to make use of the /+ leading() */ hint.

Consider the following query (I’ll put the table creation code at the end of the article if you want to experiment):

select
	/*+ use_nl(a b) */
	a.v1, b.v1, c.v1, d.v1
from
	a, b, c, d
where
	d.n100 = 0
and	a.n100 = d.id
and	b.n100= a.n2
and	c.id = a.id
;

Only one of the tables a and b can be the first table in the final execution plan so one of them will be “the next table in the join order” at some point, so this hint will guarantee that one of the tables will be the inner table of a nested loop join. Here’s the plan I happened to get with my data, indexing, version (11.2.0.4), etc.:

---------------------------------------------------------------------------------------
| Id  | Operation                      | Name | Rows  | Bytes | Cost (%CPU)| Time     |
---------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT               |      | 20000 |  1347K| 30125   (1)| 00:00:02 |
|   1 |  HASH JOIN                     |      | 20000 |  1347K| 30125   (1)| 00:00:02 |
|   2 |   TABLE ACCESS FULL            | C    | 10000 |   146K|    26   (4)| 00:00:01 |
|   3 |   HASH JOIN                    |      | 20000 |  1054K| 30098   (1)| 00:00:02 |
|   4 |    TABLE ACCESS FULL           | D    |   100 |  1800 |    26   (4)| 00:00:01 |
|   5 |    NESTED LOOPS                |      | 20000 |   703K| 30072   (1)| 00:00:02 |
|   6 |     NESTED LOOPS               |      | 20000 |   703K| 30072   (1)| 00:00:02 |
|   7 |      TABLE ACCESS FULL         | B    | 10000 |   136K|    26   (4)| 00:00:01 |
|   8 |      INDEX RANGE SCAN          | A_I2 |     2 |       |     1   (0)| 00:00:01 |
|   9 |     TABLE ACCESS BY INDEX ROWID| A    |     2 |    44 |     3   (0)| 00:00:01 |
---------------------------------------------------------------------------------------

In this case it’s table a that ends up in a position to be the inner table of a nested loop join.

You may be wondering why there seems to be a hash join into b when we’ve hinted a nested loop join – but the join order that Oracle is using is B -> A -> D -> C with a swap_join_inputs(d) swap_join_inputs(d), so b is never “the next table in the join order”.

If you want an even more confusing (at first sight) plan here’s the plan I got if I changed the one hint to /*+ use_nl(a) */


-----------------------------------------------------------------------------
| Id  | Operation            | Name | Rows  | Bytes | Cost (%CPU)| Time     |
-----------------------------------------------------------------------------
|   0 | SELECT STATEMENT     |      | 20000 |  1347K|   105   (5)| 00:00:01 |
|   1 |  HASH JOIN           |      | 20000 |  1347K|   105   (5)| 00:00:01 |
|   2 |   TABLE ACCESS FULL  | B    | 10000 |   136K|    26   (4)| 00:00:01 |
|   3 |   HASH JOIN          |      | 10000 |   537K|    78   (4)| 00:00:01 |
|   4 |    TABLE ACCESS FULL | C    | 10000 |   146K|    26   (4)| 00:00:01 |
|   5 |    HASH JOIN         |      | 10000 |   390K|    52   (4)| 00:00:01 |
|   6 |     TABLE ACCESS FULL| D    |   100 |  1800 |    26   (4)| 00:00:01 |
|   7 |     TABLE ACCESS FULL| A    | 10000 |   214K|    26   (4)| 00:00:01 |
-----------------------------------------------------------------------------

This plan really looks as if Oracle should have done a nested loop into a but didn’t. Again appearanced are deceptive thanks to the effects of swap_join_inputs(): the join order here is A -> D -> C -> B (note that we don’t have a use_nl(b) hint in this example).

If you want a plan where the optimizer produces a nested loop join between a and b you’ll need to put in a leading() hint which places b immediately after a somewhere in the list of tables with just use_nl(b) being sufficient to enforce the join method. Here, for example, is the plan with hints /*+ leading(d a b c) use_nl(b) */ for my data set:


----------------------------------------------------------------------------------------
| Id  | Operation                     | Name   | Rows  | Bytes | Cost (%CPU)| Time     |
----------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT              |        | 20000 |  1347K| 30164   (1)| 00:00:02 |
|   1 |  HASH JOIN                    |        | 20000 |  1347K| 30164   (1)| 00:00:02 |
|   2 |   TABLE ACCESS FULL           | C      | 10000 |   146K|    26   (4)| 00:00:01 |
|   3 |   NESTED LOOPS                |        | 20000 |  1054K| 30137   (1)| 00:00:02 |
|   4 |    NESTED LOOPS               |        |  1000K|  1054K| 30137   (1)| 00:00:02 |
|   5 |     HASH JOIN                 |        | 10000 |   390K|    52   (4)| 00:00:01 |
|   6 |      TABLE ACCESS FULL        | D      |   100 |  1800 |    26   (4)| 00:00:01 |
|   7 |      TABLE ACCESS FULL        | A      | 10000 |   214K|    26   (4)| 00:00:01 |
|   8 |     INDEX RANGE SCAN          | B_I100 |   100 |       |     1   (0)| 00:00:01 |
|   9 |    TABLE ACCESS BY INDEX ROWID| B      |     2 |    28 |   101   (0)| 00:00:01 |
----------------------------------------------------------------------------------------

Notice, yet again, Oracle has done hash join to c with a swap_join_inputs().

Creation Script:

create table a
nologging
as
with generator as (
        select 
                rownum id
        from dual 
        connect by 
                level <= 1e4
)
select
	rownum				id,
	mod(rownum,5000)		n2,
	mod(rownum,100)			n100,
	lpad(rownum,10,'0')		v1,
	lpad('x',100,'x')		padding
from
        generator       v1
;

create table b nologging as select * from a;
create table c nologging as select * from a;
create table d nologging as select * from a;

alter table a add constraint a_pk primary key(id);
alter table b add constraint b_pk primary key(id);
alter table c add constraint c_pk primary key(id);
alter table d add constraint d_pk primary key(id);

create index a_i2 on a(n2) nologging;
create index b_i2 on b(n2) nologging;
create index c_i2 on c(n2) nologging;
create index d_i2 on d(n2) nologging;

create index a_i100 on a(n100) nologging;
create index b_i100 on b(n100) nologging;
create index c_i100 on c(n100) nologging;
create index d_i100 on d(n100) nologging;
begin
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'A',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'B',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'C',
		method_opt	 => 'for all columns size 1'
	);
	dbms_stats.gather_table_stats(
		ownname		 => user,
		tabname		 =>'D',
		method_opt	 => 'for all columns size 1'
	);
end;
/

Will EBS Work with Windows 10 Creators Update?

Steven Chan - Fri, 2017-01-13 02:05

Windows 10 logoMicrosoft is releasing a major update to Windows 10 called the Windows 10 Creators Update:

This new update includes a number of features specifically aimed at enterprise and business users.  These include:

  • Windows Security Center
  • Windows Defender Advanced Threat Protection
  • In-place UEFI conversion

Differential downloads for Windows 10

Microsoft is also changing their packaging of major Windows 10 updates from full operating system downloads to smaller, incremental downloads.  This new delivery method is called the Unified Update Platform (UUP).

The new UUP delivery method will provide only the Windows 10 patches that have been released since the last time a desktop was updated.  

Will this work with EBS?

Yes.  Windows 10 is certified with all current EBS releases, including EBS 12.1 and 12.2. This existing certification is expected to apply to the upcoming Windows 10 Creators Update release.

The Unified Update Platform is not expected to require an additional certification.  Microsoft emphasizes that nothing changes with Windows 10 itself -- this only modifies the way that Win10 patches are delivered:

It’s important to note that with UUP, nothing will look or behave differently on the surface, UUP is all underlying platform and service optimization that happens behind the scenes.

Related Articles

The preceding is intended to outline our general product direction.  It is intended for information purposes only, and may not be incorporated into any contract.   It is not a commitment to deliver any material, code, or functionality, and should not be relied upon in making purchasing decision.  The development, release, and timing of any features or functionality described for Oracle’s products remains at the sole discretion of Oracle.

Categories: APPS Blogs

Oracle SQL Repeated words in the String

Tom Kyte - Thu, 2017-01-12 15:26
I need your suggestions/inputs on of the following task. I have the following table ID ID_NAME 1 TOM HANKS TOM JR 2 PETER PATROL PETER JOHN PETER 3 SAM LIVING 4 JOHNSON & JOHNSON INC 5 DUHGT LLC 6 THE POST OF THE OFFICE 7 ...
Categories: DBA Blogs

NO_DATA_FOUND in Functions

Tom Kyte - Thu, 2017-01-12 15:26
Tom, We've just migrated from Oracle 9.0.1 to 9.2 and, coincidence or not, I'm facing a problem I had never faced before. The NO_DATA_FOUND exception is not being raised from my PL/SQL functions anymore!!! If I put a exception block to handle t...
Categories: DBA Blogs

NEXT vs UNIFORM SIZE

Tom Kyte - Thu, 2017-01-12 15:26
Hi All, Please clarify below doubt, If I use NEXT 10M and UNIFIOM SIZE 1M, which size will my next extent will have 1M or 10M? if we use this command: create TABLESPACE ts_cvliste11_20180815 datafile '/ora1/app/oracle/oradata/L11/2018/ts_c...
Categories: DBA Blogs

MIT Prof Claims RDBMS Model is All Wrong

Tom Kyte - Thu, 2017-01-12 15:26
Hi Team, My manager recently sent out this interesting talk by the founder of Postgres and VoltDB, Michael Stonebraker. https://blog.jooq.org/2013/08/24/mit-prof-michael-stonebraker-the-traditional-rdbms-wisdom-is-all-wrong/ I am curious of ...
Categories: DBA Blogs

Historical CPU and memory usage by service

Tom Kyte - Thu, 2017-01-12 15:26
Hello Tom, Is there a database view like DBA_HIST_SYSSTAT grouping historical session statistics by user or service? I was asked by my boss to create a report showing the CPU and memory usage by department but I could not find a view with that kin...
Categories: DBA Blogs

DYNAMIC INSERT SCRIPT WITH TIMESTAMP ISSUE

Tom Kyte - Thu, 2017-01-12 15:26
Hi, I am new to oracle, i have used your create dynamic insert script for generating the insert script. But that query is taking care of only three datatypes like NUMBER, DATE and VARCHAR2(). It is not taking care about the TIMESTAMP data type si...
Categories: DBA Blogs

Use of Wrap utility for 12K LOC packages

Tom Kyte - Thu, 2017-01-12 15:26
I've create a SP to encrypt the source code of existing packages, my problem is that legacy packages contain more than 12K LoC (lines of code), with small packages I have no issue, but with the heavy ones I got : <i>06502. 00000 - "PL/SQL: nu...
Categories: DBA Blogs

Oracle 11g Exadata SQL performance Tuning books?

Tom Kyte - Thu, 2017-01-12 15:26
Hi Tom, Can you please suggest some books for Oracle 11g Exadata SQL performance Tuning books? Regards, Mayank Jain
Categories: DBA Blogs

Where clause with multiple arguments which can be null or populated

Tom Kyte - Thu, 2017-01-12 15:26
I have a search feature in my application where a user can type filter on 3 different columns(can be more), each of these filters can be used or null. What is the best way to select a ref cursor for the data? I can think of 2 different ways. 1st wa...
Categories: DBA Blogs

Two New Oracle Security Public Class Dates

Pete Finnigan - Thu, 2017-01-12 15:26

I will be teaching two of my Oracle Security classes with Oracle University soon. The first is my class "Securing and Locking Down Oracle Databases". This class will be taught on the 24th January on-line via the Oracle LVC platform....[Read More]

Posted by Pete On 12/01/17 At 02:47 PM

Categories: Security Blogs

GoldenGate 12c Certified with E-Business Suite 12.2

Steven Chan - Thu, 2017-01-12 13:37

Oracle GoldenGate provides real-time capture, transformation, routing, and delivery of database transactions across heterogeneous systems.  The GoldenGate Administrator's Guide shows six supported topologies for generic databases.  Oracle GoldenGate 12c is now certified for a subset of three of those methods in Oracle E-Business Suite Release 12.2 environments:

  1. Unidirectional (Reporting Instance):  From the E-Business Suite to a reporting instance
  2. Broadcast (Data Distribution): From the E-Business Suite to multiple external databases
  3. Consolidation (Data Warehouse/Mart/Store): From multiple sources, including the E-Business Suite, into a single external database
Three supported GoldenGate topologies in E-Business Suite environments

Procedures for replicating information from E-Business Suite 12.2 using GoldenGate to a secondary system are documented in:

Unidirectional  reporting only

Certified Combinations

  • EBS 12.2.5 and later 12.2.x releases
  • Database 12.1.0.2 or 11.2.0.4
  • GoldenGate 12.1.2 and later 12.x releases

Replication to a Non-Editioned Target Database

EBS 12.2's Online Patching architecture has significant implications for data replication solutions implemented using Oracle GoldenGate. The Note above describes how to set up a unidirectional configuration of Oracle GoldenGate replication from an Oracle E-Business Suite Release 12.2 source database to a non-editioned target database.

Using GoldenGate with EBS 12.2 environments is significantly different from previous EBS releases.  If you are upgrading your GoldenGate + EBS 12.1 environment to EBS 12.2, and you must plan carefully for the new integration steps detailed in the documentation above.

Only Unidirectional replication for EBS

GoldenGate is an extremely powerful solution, and one of its most-interesting features is the ability to provide data replication in both directions.  In some generic scenarios, you can choose to make alterations to replicated data on the external instance and have those changes updated back in the source database.

Bidirectional replication is not permitted for E-Business Suite environments.  You can use GoldenGate to replicate EBS data to an external database, but you cannot move that altered data back into the EBS database.  GoldenGate's ability to read data in EBS databases is certified, but it must not be used to write to EBS databases.

"How" But Not "What"

GoldenGate is akin to a scalpel.  You're free to use it, but you must learn how to perform open heart surgery yourself.

The document above describes how to use GoldenGate to replicate data from the E-Business Suite to another system.  It doesn't provide any guidance about which database objects to replicate.

The E-Business Suite data model is vast, as you'd expect of something that has supports the integrated functioning of over 200 functional product modules.  GoldenGate is a development tool, and like all development tools, requires an expert-level understanding of the E-Business Suite data model for successful use.  Here's a good EBS data model resource:

Not Certified for EBS Migrations and Upgrades

GoldenGate is not certified for:

  • Upgrading E-Business Suite databases from one version to another
  • Migrating E-Business Suite databases from one operating system platform to another

Why not?  Oracle GoldenGate does not currently support the replication of all of the datatypes that are used by the E-Business Suite.  In addition, GoldenGate relies upon tables with primary keys for replication. EBS has over 10,000 tables without primary keys.  That means that attempting to use GoldenGate to migrate or upgrade an entire E-Business Suite database requires a combination of GoldenGate and other manual steps to preserve referential integrity.

Oracle does not currently have any plans to produce a certified or automated method of using GoldenGate for E-Business Suite upgrades or platform migrations.

EBS 12.2 customers should use our documented procedures for upgrades and migrations.  

Related Articles
Categories: APPS Blogs

Partner Webcast – Oracle Identity Cloud Service: Introducing Secure, On-Demand Identity Management

As corporate computing services continue to evolve and many aspects of the IT infrastructure are on the journey to the cloud, authorizing people to use enterprise information systems becomes more and...

We share our skills to maximize your revenue!
Categories: DBA Blogs

Maximizing the Value of Oracle Support - Part 5

Chris Warticki - Thu, 2017-01-12 10:10

The following is Part 4 of a 5-part series on Maximizing the Value of Oracle Support.Click here for Part 4 Click here for Part 3 Click here for Part 2 Click here for Part 1

I am FOR our customers.

In this final piece I want to leave you with the following image and just ask yourself "Are we taking full advantage of everything from Oracle Support?  Or, are we just creating and managing Service Requests?"


Oracle Support is here to help you maximize your investment and leverage all the available resources
and best practices.

How to add an NCSA style Access Log to ORDS Standalone

Kris Rice - Thu, 2017-01-12 09:32
What ORDS Standalone is      ORDS Standalone webserver which is Eclipse Jetty, https://eclipse.org/jetty/ .  For the standalone, ORDS sends output to STDOUT, it runs on the command line.  That means there's nothing like a control commands like startup, shutdown,status nor log files, access logs.  It's bare bones intentionally to get up and running fast.  Then it's recommended for anything with

Getting Started with Spark Streaming, Python, and Kafka

Rittman Mead Consulting - Thu, 2017-01-12 07:42

Last month I wrote a series of articles in which I looked at the use of Spark for performing data transformation and manipulation. This was in the context of replatforming an existing Oracle-based ETL and datawarehouse solution onto cheaper and more elastic alternatives. The processing that I wrote was very much batch-focussed; read a set of files from block storage ('disk'), process and enrich the data, and write it back to block storage.

In this article I am going to look at Spark Streaming. This is one of several libraries that the Spark platform provides (others include Spark SQL, Spark MLlib, and Spark GraphX). Spark Streaming provides a way of processing "unbounded" data - commonly referred to as "streaming" data. It does this by breaking it up into microbatches, and supporting windowing capabilities for processing across multiple batches. You can read more in the excellent Streaming Programming Guide.

(image src)

Why Stream Processing?

Processing unbounded data sets, or "stream processing", is a new way of looking at what has always been done as batch in the past. Whilst intra-day ETL and frequent batch executions have brought latencies down, they are still independent executions with optional bespoke code in place to handle intra-batch accumulations. With a platform such as Spark Streaming we have a framework that natively supports processing both within-batch and across-batch (windowing).

By taking a stream processing approach we can benefit in several ways. The most obvious is reducing latency between an event occurring and taking an action driven by it, whether automatic or via analytics presented to a human. Other benefits include a more smoothed out resource consumption profile. We can avoid the very 'spiky' demands on CPU/memory/etc every time a batch runs by instead processing the same volume of data processed but in smaller intervals. Finally, given that most data we process is actually unbounded ("life doesn't happen in batches"), designing new systems to be batch driven - with streaming seen as an exception - is actually an anachronism with roots in technology limitations that are rapidly becoming moot. Stream processing doesn't have to imply, or require, "fast data" or "big data". It can just mean processing data continually as it arrives, and not artificially splitting it into batches.

For more details and discussion of streaming in depth and some of its challenges, I would recommend:

Use-Case and Development Environment

So with that case made above for stream processing, I'm actually going to go back to a very modest example. The use-case I'm going to put together is - almost inevitably for a generic unbounded data example - using Twitter, read from an Apache Kafka topic. We'll start simply, counting the number of tweets per user within each batch and doing some very simple string manipulations. After that we'll see how to do the same but over a period of time (windowing). In the next blog we'll extend this further into a more useful example, still based on Twitter but demonstrating how to satisfy some real-world requirements in the processing.

I developed all of this code using Jupyter Notebooks. I've written before about how awesome notebooks are (along with Jupyter, there's Apache Zeppelin). As well as providing a superb development environment in which both the code and the generated results can be seen, Jupyter gives the option to download a Notebook to Markdown. This blog runs on Ghost, which uses Markdown as its native syntax for composing posts - so in fact what you're reading here comes directly from the notebook in which I developed the code. Pretty cool.

If you want can view the notebook online here, and from there download it and run it live on your own Jupyter instance.

I used the docker image all-spark-notebook to provide both Jupyter and the Spark runtime environment. By using Docker I don't have to really worry about provisioning the platform on which I want to develop the code - I can just dive straight in and start coding. As and when I'm ready to deploy the code to a 'real' execution environment (for example EMR), then I can start to worry about that. The only external aspect was an Apache Kafka cluster that I had already, with tweets from the live Twitter feed on an Apache Kafka topic imaginatively called twitter.

To run the code in Jupyter, you can put the cursor in each cell and press Shift-Enter to run it each cell at a time -- or you can use menu option Kernel -> Restart & Run All. When a cell is executing you'll see a [*] next to it, and once the execution is complete this changes to [y] where y is execution step number. Any output from that step will be shown immediately below it.

To run the code standalone, you would download the .py from Jupyter, and execute it from the commandline using:

/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 spark_code.py
Preparing the Environment

We need to make sure that the packages we're going to use are available to Spark. Instead of downloading jar files and worrying about paths, we can instead use the --packages option and specify the group/artifact/version based on what's available on Maven and Spark will handle the downloading. We specify PYSPARK_SUBMIT_ARGS for this to get passed correctly when executing from within Jupyter.

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
Import dependencies

We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka. We also need the python json module for parsing the inbound twitter data

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json  
Create Spark context

The Spark context is the primary object under which everything else is called. The setLogLevel call is optional, but saves a lot of noise on stdout that otherwise can swamp the actual outputs from the job.

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("WARN")  
Create Streaming Context

We pass the Spark context (from above) along with the batch duration which here is set to 60 seconds.

See the API reference and programming guide for more details.

ssc = StreamingContext(sc, 60)  
Connect to Kafka

Using the native Spark Streaming Kafka capabilities, we use the streaming context from above to connect to our Kafka cluster. The topic connected to is twitter, from consumer group spark-streaming. The latter is an arbitrary name that can be changed as required.

For more information see the documentation.

kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})  
Message Processing Parse the inbound message as json

The inbound stream is a DStream, which supports various built-in transformations such as map which is used here to parse the inbound messages from their native JSON format.

Note that this will fail horribly if the inbound message isn't valid JSON.

parsed = kafkaStream.map(lambda v: json.loads(v[1]))  
Count number of tweets in the batch

The DStream object provides native functions to count the number of messages in the batch, and to print them to the output:

We use the map function to add in some text explaining the value printed.

Note that nothing gets written to output from the Spark Streaming context and descendent objects until the Spark Streaming Context is started, which happens later in the code. Also note that pprint by default only prints the first 10 values.

parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()  

If you jump ahead and try to use Windowing at this point, for example to count the number of tweets in the last hour using the countByWindow function, it'll fail. This is because we've not set up the streaming context with a checkpoint directory yet. You'll get the error: java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().. See later on in the blog for details about how to do this.

Extract Author name from each tweet

Tweets come through in a JSON structure, of which you can see an example here. We're going to analyse tweets by author, which is accessible in the JSON structure at user.screen_name.

The lambda anonymous function is used to apply the map to each RDD within the DStream. The result is a DStream holding just the author's screenname for each tweet in the original DStream.

authors_dstream = parsed.map(lambda tweet: tweet['user']['screen_name'])  
Count the number of tweets per author

With our authors DStream, we can now count them using the countByValue function. This is conceptually the same as this quasi-SQL statement:

SELECT   AUTHOR, COUNT(*)
FROM     DSTREAM
GROUP BY AUTHOR

Using countByValue is a more legible way of doing the same thing that you'll see done in tutorials elsewhere with a map / reduceBy.

author_counts = authors_dstream.countByValue()  
author_counts.pprint()  
Sort the author count

If you try and use the sortBy function directly against the DStream you get an error:

'TransformedDStream' object has no attribute 'sortBy'

This is because sort is not a built-in DStream function. Instad we use the transform function to access sortBy from pySpark.

To use sortBy you specify a lambda function to define the sort order. Here we're going to do it based on the number of tweets (index 1 of the RDD) per author. You'll note this index references being used in the sortBy lambda function x[1], negated to reverse the sort order.

Here I'm using \ as line continuation characters to make the code more legible.

author_counts_sorted_dstream = author_counts.transform(\  
  (lambda foo:foo\
   .sortBy(lambda x:( -x[1]))))
author_counts_sorted_dstream.pprint()  
Get top 5 authors by tweet count

To display just the top five authors, based on number of tweets in the batch period, we'll using the take function. My first attempt at this failed with:

AttributeError: 'list' object has no attribute '_jrdd'

Per my woes on StackOverflow a parallelize is necessary to return the values into a DStream form.

top_five_authors = author_counts_sorted_dstream.transform\  
  (lambda rdd:sc.parallelize(rdd.take(5)))
top_five_authors.pprint()  
Get authors with more than one tweet, or whose username starts with 'rm'

Let's get a bit more fancy now - filtering the resulting list of authors to only show the ones who have tweeted more than once in our batch window, or -arbitrarily- whose screenname begins with rm...

filtered_authors = author_counts.filter(lambda x:\  
                                                x[1]>1 \
                                                or \
                                                x[0].lower().startswith('rm'))

We'll print this list of authors matching the criteria, sorted by the number of tweets. Note how the sort is being done inline to the calling of the pprint function. Assigning variables and then pprinting them as I've done above is only done for clarity. It also makes sense if you're going to subsequently reuse the derived stream variable (such as with the author_counts in this code).

filtered_authors.transform\  
  (lambda rdd:rdd\
  .sortBy(lambda x:-x[1]))\
  .pprint()
List the most common words in the tweets

Every example has to have a version of wordcount, right? Here's an all-in-one with line continuations to make it clearer what's going on. Note that whilst it makes for tidier code, it also makes it harder to debug...

parsed.\  
    flatMap(lambda tweet:tweet['text'].split(" "))\
    .countByValue()\
    .transform\
      (lambda rdd:rdd.sortBy(lambda x:-x[1]))\
    .pprint()
Start the streaming context

Having defined the streaming context, now we're ready to actually start it! When you run this cell, the program will start, and you'll see the result of all the pprint functions above appear in the output to this cell below. If you're running it outside of Jupyter (via spark-submit) then you'll see the output on stdout.

ssc.start()  
ssc.awaitTermination()  
-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
Tweets in this batch: 188

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'jenniekmz', 1)
(u'SpamNewton', 1)
(u'ShawtieMac', 1)
(u'niggajorge_2', 1)
(u'agathatochetti', 1)
(u'Tommyguns&#95;&#95;&#95;&#95;&#95;', 1)
(u'zwonderwomanzzz', 1)
(u'Blesschubstin', 1)
(u'Prikes5', 1)
(u'MayaParms', 1)
...

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)
(u'pchrin_', 2)
(u'uslaybieber', 2)
(u'rowblanchsrd', 2)
(u'__Creammy__', 2)
(u'jenniekmz', 1)
...

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RitaBezerra12', 3)
(u'xKYLN', 2)
(u'yourmydw', 2)
(u'wintersheat', 2)
(u'biebercuzou', 2)
(u'pchrin_', 2)
(u'uslaybieber', 2)
(u'rowblanchsrd', 2)
(u'__Creammy__', 2)

-------------------------------------------
Time: 2017-01-11 15:34:00
-------------------------------------------
(u'RT', 135)
(u'Justin', 61)
(u'Bieber', 59)
(u'on', 41)
(u'a', 32)
(u'&amp;', 32)
(u'Ros\xe9', 31)
(u'Drake', 31)
(u'the', 29)
(u'Love', 28)
...
[...]

You can see the full output from the job in the notebook here.

So there we have it, a very simple Spark Streaming application doing some basic processing against an inbound data stream from Kafka.

Windowed Stream Processing

Now let's have a look at how we can do windowed processing. This is where data is processed based on a 'window' which is a multiple of the batch duration that we worked with above. So instead of counting how many tweets there are every batch (say, 5 seconds), we could instead count how many there are per minute. Here, a minutes (60 seconds) is the window interval. We can perform this count potentially every time the batch runs; how frequently we do the count is known as the slide interval.


Image credit, and more details about window processing, here.

The first thing to do to enable windowed processing in Spark Streaming is to launch the Streaming Context with a checkpoint directory configured. This is used to store information between batches if necessary, and also to recover from failures. You need to rework your code into the pattern shown here. All the code to be executed by the streaming context goes in a function - which makes it less easy to present in a step-by-step form in a notebook as I have above.

Reset the Environment

If you're running this code in the same session as above, first go to the Jupyter Kernel menu and select Restart.

Prepare the environment

These are the same steps as above.

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'  
from pyspark import SparkContext  
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
import json  
Define the stream processing code
def createContext():  
    sc = SparkContext(appName="PythonSparkStreamingKafka_RM_02")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 5)

    # Define Kafka Consumer
    kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming2', {'twitter':1})

    ## --- Processing
    # Extract tweets
    parsed = kafkaStream.map(lambda v: json.loads(v[1]))

    # Count number of tweets in the batch
    count_this_batch = kafkaStream.count().map(lambda x:('Tweets this batch: %s' % x))

    # Count by windowed time period
    count_windowed = kafkaStream.countByWindow(60,5).map(lambda x:('Tweets total (One minute rolling count): %s' % x))

    # Get authors
    authors_dstream = parsed.map(lambda tweet: tweet['user']['screen_name'])

    # Count each value and number of occurences 
    count_values_this_batch = authors_dstream.countByValue()\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                              .map(lambda x:"Author counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))

    # Count each value and number of occurences in the batch windowed
    count_values_windowed = authors_dstream.countByValueAndWindow(60,5)\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                            .map(lambda x:"Author counts (One minute rolling):\tValue %s\tCount %s" % (x[0],x[1]))

    # Write total tweet counts to stdout
    # Done with a union here instead of two separate pprint statements just to make it cleaner to display
    count_this_batch.union(count_windowed).pprint()

    # Write tweet author counts to stdout
    count_values_this_batch.pprint(5)
    count_values_windowed.pprint(5)

    return ssc
Launch the stream processing

This uses local disk to store the checkpoint data. In a Production deployment this would be on resilient storage such as HDFS.

Note that, by design, if you restart this code using the same checkpoint folder, it will execute the previous code - so if you need to amend the code being executed, specify a different checkpoint folder.

ssc = StreamingContext.getOrCreate('/tmp/checkpoint_v01',lambda: createContext())  
ssc.start()  
ssc.awaitTermination()  
-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Tweets this batch: 782
Tweets total (One minute rolling count): 782

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts this batch:    Value AnnaSabryan   Count 8
Author counts this batch:    Value KHALILSAFADO  Count 7
Author counts this batch:    Value socialvidpress    Count 6
Author counts this batch:    Value SabSad_   Count 5
Author counts this batch:    Value CooleeBravo   Count 5
...

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 8
Author counts (One minute rolling):    Value KHALILSAFADO  Count 7
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5
...

[...]

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Tweets this batch: 5
Tweets total (One minute rolling count): 245

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Author counts this batch:    Value NowOnFR   Count 1
Author counts this batch:    Value IKeepIt2000   Count 1
Author counts this batch:    Value PCH_Intl  Count 1
Author counts this batch:    Value ___GlBBS  Count 1
Author counts this batch:    Value lauracoutinho24   Count 1

-------------------------------------------
Time: 2017-01-11 17:10:10
-------------------------------------------
Author counts (One minute rolling):    Value OdaSethre Count 3
Author counts (One minute rolling):    Value CooleeBravo   Count 2
Author counts (One minute rolling):    Value ArrezinaR Count 2
Author counts (One minute rolling):    Value blackpinkkot4 Count 2
Author counts (One minute rolling):    Value mat_lucidream Count 1
...

You can see the full output from the job in the notebook here. Let's take some extracts and walk through them.

Total tweet counts

First, the total tweet counts. In the first slide window, they're the same, since we only have one batch of data so far:

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Tweets this batch: 782
Tweets total (One minute rolling count): 782 

Five seconds later, we have 25 tweets in the current batch - giving us a total of 807 (782 + 25):

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Tweets this batch: 25
Tweets total (One minute rolling count): 807 

Fast forward just over a minute and we see that the windowed count for a minute is not just going up - in some cases it goes down - since our window is now not simply the full duration of the inbound data stream, but is shifting along and giving a total count for the last 60 seconds only.

-------------------------------------------
Time: 2017-01-11 17:09:50
-------------------------------------------
Tweets this batch: 28
Tweets total (One minute rolling count): 1012

-------------------------------------------
Time: 2017-01-11 17:09:55
-------------------------------------------
Tweets this batch: 24
Tweets total (One minute rolling count): 254
Count by Author

In the first batch, as with the total tweets, the batch tally is the same as the windowed one:

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts this batch:    Value AnnaSabryan   Count 8
Author counts this batch:    Value KHALILSAFADO  Count 7
Author counts this batch:    Value socialvidpress    Count 6
Author counts this batch:    Value SabSad_   Count 5
Author counts this batch:    Value CooleeBravo   Count 5
...

-------------------------------------------
Time: 2017-01-11 17:08:55
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 8
Author counts (One minute rolling):    Value KHALILSAFADO  Count 7
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5    

But notice in subsequent batches the rolling totals are accumulating for each author. Here we can see KHALILSAFADO (with a previous rolling total of 7, as above) has another tweet in this batch, giving a rolling total of 8:

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Author counts this batch:    Value DawnExperience    Count 1
Author counts this batch:    Value KHALILSAFADO  Count 1
Author counts this batch:    Value Alchemister5  Count 1
Author counts this batch:    Value uused2callme  Count 1
Author counts this batch:    Value comfyjongin   Count 1
...

-------------------------------------------
Time: 2017-01-11 17:09:00
-------------------------------------------
Author counts (One minute rolling):    Value AnnaSabryan   Count 9
Author counts (One minute rolling):    Value KHALILSAFADO  Count 8
Author counts (One minute rolling):    Value socialvidpress    Count 6
Author counts (One minute rolling):    Value SabSad_   Count 5
Author counts (One minute rolling):    Value CooleeBravo   Count 5
Summary

What I've put together is a very rudimentary example, simply to get started with the concepts. In the examples in this article I used Spark Streaming because of its native support for Python, and the previous work I'd done with Spark. Jupyter Notebooks are a fantastic environment in which to prototype code, and for a local environment providing both Jupyter and Spark it all you can't beat the Docker image all-spark-notebook.

There are other stream processing frameworks and languages out there, including Apache Flink, Kafka Streams, and Apache Beam, to name but three. Apache Storm and Apache Samza are also relevant, but whilst were early to the party seem to crop up less frequently in stream processing discussions and literature nowadays.

In the next blog we'll see how to extend this Spark Streaming further with processing that includes:

  • Matching tweet contents to predefined list of filter terms, and filtering out retweets
  • Including only tweets that include URLs, and comparing those URLs to a whitelist of domains
  • Sending tweets matching a given condition to a Kafka topic
  • Keeping a tally of tweet counts per batch and over a longer period of time, as well as counts for terms matched within the tweets
Categories: BI & Warehousing

Pages

Subscribe to Oracle FAQ aggregator