Skip navigation.

Antony Reynolds

Syndicate content
Musings on Fusion Middleware and SOA
Updated: 16 hours 33 min ago

One Queue to Rule them All

Fri, 2014-03-28 15:16
Using a Single Queue for Multiple Message Types with SOA Suite Problem StatementYou use a single JMS queue for sending multiple message types /  service requests.  You use a single JMS queue for receiving multiple message types / service requests.  You have multiple SOA JMS Adapter interfaces for reading and writing these queues.  In a composite it is random which interface gets a message from the JMS queue.  It is not a problem having multiple adapter instances writing to a single queue, the problem is only with having multiple readers because each reader gets the first message on the queue. Background

The JMS Adapter is unaware of who receives the messages.  Each adapter instance just takes the message from the queue and delivers it to its own configured interface, one interface per adapter instance.  The SOA infrastructure is then responsible for routing that message, usually via a database table and an in memory notification message, to a component within a composite.  Each message will create a new composite but the BPEL engine and Mediator engine will attempt to match callback messages to the appropriate Mediator or BPEL instance.
Note that message type, including XML document type, has nothing to do with the preceding statements.

The net result is that if you have a sequence of two receives from the same queue using different adapters then the messages will be split equally between the two adapters, meaning that half the time the wrong adapter will receive the message.  This blog entry looks at how to resolve this issue.

Note that the same problem occurs whenever you have more than 1 adapter listening to the same queue, whether they are in the same composite or different composites.  The solution in this blog entry is also relevant to this use case.

SolutionsIn order to correctly deliver the messages to the correct interface we need to identify the interface they should be delivered to.  This can be done by using JMS properties.  For example the JMSType property can be used to identify the type of the message.  A message selector can be added to the JMS inbound adapter that will cause the adapter to filter out messages intended for other interfaces.  For example if we need to call three services that are implemented in a single application:
  • Service 1 receives messages on the single outbound queue from SOA, it send responses back on the single inbound queue.
  • Similarly Service 2 and Service 3 also receive messages on the single outbound queue from SOA, they send responses back on the single inbound queue.
First we need to ensure the messages are delivered to the correct adapter instance.  This is achieved as follows:
  • aThe inbound JMS adapter is configured with a JMS message selector.  The message selector might be "JMSType='Service1'" for responses from Service 1.  Similarly the selector would be "JMSType='Service2'" for the adapter waiting on a response from Service 2.  The message selector ensures that each adapter instance will retrieve the first message from the queue that matches its selector.
  • The sending service needs to set the JMS property (JMSType in our example) that is used in the message selector.
Now our messages are being delivered to the correct interface we need to make sure that they get delivered to the correct Mediator or BPEL instance.  We do this with correlation.  There are several correlation options:
  1. We can do manual correlation with a correlation set, identifying parts of the outbound message that uniquely identify our instance and matching them with parts of the inbound message to make the correlation.
  2. We can use a Request-Reply JMS adapter which by default expects the response to contain a JMSCorrelationID equal to the outgoing JMSMessageID.  Although no configuration is required for this on the SOA client side, the service needs to copy the incoming JMSMessageID to the outgoing JMSCorrelationID.
Special Case - Request-Reply Synchronous JMS Adapter

When using a synchronous Request-Reply JMS adapter we can omit to specify the message selector because the Request-Reply JMS adapter will immediately do a listen with a message selector for the correlation ID rather than processing the incoming message asynchronously.
The synchronous request-reply will block the BPEL process thread and hold open the BPEL transaction until a response is received, so this should only be used when you expect the request to be completed in a few seconds.

The JCA Connection Factory used must point to a non-XA JMS Connection Factory and must have the isTransacted property set to “false”.  See the documentation for more details.

Sample

I developed a JDeveloper SOA project that demonstrates using a single queue for multiple incoming adapters.  The overall process flow is shown in the picture below.  The BPEL process on the left receives messages from the jms/TestQueue2 and sends messages to the jms/Test Queue1.  A Mediator is used to simulate multiple services and also provide a web interface to initiate the process.  The correct adapter is identified by using JMS message properties and a selector.

 

The flow above shows that the process is initiated from EM using a web service binding on mediator.  The mediator, acting as a client, posts the request to the inbound queue with a JMSType property set to "Initiate". Model Client BPEL Service Inbound Request Client receives web service request and posts the request to the inbound queue with JMSType='Initiate' The JMS adapter with a message selector "JMSType='Initiate'" receives the message and causes a composite to be created.  The composite in turn causes the BPEL process to start executing.
The BPEL process then sends a request to Service 1 on the outbound queue.
Key Points

  • Initiate message can be used to initate a correlation set if necessary
  • Selector required to distinguish initiate messages from other messages on the queue
Service 1 receives the request and sends a response on the inbound queue with JMSType='Service1' and JMSCorrelationID= incoming JMS Message ID. Separate Request and Reply Adapters   The JMS adapter with a message selector "JMSType='Service1'" receives the message and causes a composite to be created.  The composite uses a correlation set to in turn deliver the message to BPEL which correlates it with the existing BPEL process.
The BPEL process then sends a request to Service 2 on the outbound queue.
Key Points
  • Separate request & reply adapters require a correlation set to ensure that reply goes to correct BPEL process instance
  • Selector required to distinguish service 1 response messages from other messages on the queue
Service 2 receives the request and sends a response on the inbound queue with JMSType='Service2' and JMSCorrelationID= incoming JMS Message ID. Asynchronous Request-Reply Adapter   The JMS adapter with a message selector "JMSType='Service2'" receives the message and causes a composite to be created.  The composite in turn delivers the message to the existing BPEL process using native JMS correlation.
Key Point
  • Asynchronous request-reply adapter does not require a correlation set, JMS adapter auto-correlates using CorrelationID to ensure that reply goes to correct BPEL process instance
  • Selector still required to distinguish service 2 response messages from other messages on the queue
The BPEL process then sends a request to Service 3 on the outbound queue using a synchronous request-reply.
Service 3 receives the request and sends a response on the inbound queue with JMSType='Service2' and JMSCorrelationID= incoming JMS Message ID. Synchronous Request-Reply Adapter   The synchronous JMS adapter receives the response without a message selector and correlates it to the BPEL process using native JMS correlation and sends the overall response to the outbound queue.
Key Points
  • Synchronous request-reply adapter does not require a correlation set, JMS adapter auto-correlates using CorrelationID to ensure that reply goes to correct BPEL process instance
  • Selector also not required to distinguish service 3 response messages from other messages on the queue because the synchronous adapter is doing a selection on the expected CorrelationID
  Outbound Response Client receives the response on an outbound queue.       Summary

When using a single JMS queue for multiple purposes bear in mind the following:

  • If multiple receives use the same queue then you need to have a message selector.  The corollary to this is that the message sender must add a JMS property to the message that can be used in the message selector.
  • When using a request-reply JMS adapter then there is no need for a correlation set, correlation is done in the adapter by matching the outbound JMS message ID to the inbound JMS correlation ID.  The corollary to this is that the message sender must copy the JMS request message ID to the JMS response correlation ID.
  • When using a synchronous request-reply JMS adapter then there is no need for the message selector because the message selection is done based on the JMS correlation ID.
  • Synchronous request-reply adapter requires a non-XA connection factory to be used so that the request part of the interaction can be committed separately to the receive part of the interaction.
  • Synchronous request-reply JMS adapter should only be used when the reply is expected to take just a few seconds.  If the reply is expected to take longer then the asynchronous request-reply JMS adapter should be used.
Deploying the Sample

The sample is available to download here and makes use of the following JMS resources:

JNDI Resource; Notes jms/TestQueue Queue Outbound queue from the BPEL process jms/TestQueue2 Queue Inbound queue to the BPEL process eis/wls/TestQueue JMS Adapter Connector Factory This can point to an XA or non-XA JMS Connection Factory such as weblogic.jms.XAConnectionFactory eis/wls/TestQueue None-XA JMS Adapter Connector Factory This must point to a non-XA JMS Connection Factory such as weblogic.jms.ConnectionFactory and must have isTransacted set to “false”

To run the sample then just use the test facility in the EM console or the soa-infra application.

Not Just a Cache

Thu, 2014-03-27 21:23
Coherence as a Compute Grid

Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid.  Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster.  In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.

Coherence Distributed Computing Options

The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache.  They can be summarized as:

  • Entry Processors
    • An InvocableMap interface, inherited by the NamedCache interface, provides support for executing an agent (EntryProcessor or EntryAggregator) on individual entries within the cache.
    • The entries may or may not exist, either way the agent is executed once for each key provided, or if no key is provided then it is executed once for each object in the cache.
    • In Enterprise and Grid editions of Coherence the entry processors are executed on the primary cache nodes holding the cached entries.
    • Agents can return results.
    • One agent executes multiple times per cache node, once for each key targeted on the node.
  • Invocation Service
    • An InvocationService provides support for executing an agent on one or more nodes within the grid.
    • Execution may be targeted at specific nodes or at all nodes running the Invocation Service.
    • Agents can return results.
    • One agent executes once per node.
  • Work Managers
    • A WorkManager class provides a grid aware implementation of the commonJ WorkManager which can be used to run tasks across multiple threads on multiple nodes within the grid.
    • WorkManagers run on multiple nodes.
    • Each WorkManager may have multiple threads.
    • Tasks implement the Work interface and are assigned to specific WorkManager threads to execute.
    • Each task is executed once.
Three Models of Distributed Computation

The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:

  • Per Cache Entry Execution (Entry Processor)
    • Execute the agent on the entry corresponding to a cache key.
    • Entries processed on a single thread per node.
    • Parallelism across nodes.
  • Per Node Execution (Invocation Service)
    • Execute the same agent once per node.
    • Agent processed on a single thread per node.
    • Parallelism across nodes.
  • Per Task Execution (Work Manager)
    • Each task executed once.
    • Parallelism across nodes and across threads within a node.

The entry processor is good for operating on individual cache entries.  It is not so good for working on groups of cache entries.

The invocation service is good for performing checks on a node, but is limited in its parallelism.

The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel.  It has a high degree of parallelism.

As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.

Differences between using Entry Processors and Work Managers in Coherence Aspect Entry Processors Work Managers Degree of parallelization Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time. Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances. Transactionality Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node. Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager. How is the Cache accessed or mutated? Operations against the cache contents are executed by (and thus within the localized context of) a cache. Accesses and changes to the cache are done directly through the cache API. Where is the processing performed? In the same JVM where the entries-to-be-processed reside. In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside. Network Traffic Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node. Is a function of the
  • Number of Work Objects, of which multiple may be sent to each server.
  • Size of the data set transferred from the Backing Map to the Work Manager Server.
Distribution of “Tasks” Tasks are moved to the location at which the entries-to-be-processed are being managed. This may result in a random distribution of tasks. The distribution tends to get equitable as the number of entries increases. Tasks are distributed equally across the threads in the Work Manager Instances. Implementation of the EntryProcessor or Work class. Create a class that extends AbstractProcessor. Implement the process method. Update the cache item based on the key passed in to the process method. Create a class that is serializable and implements commonj.work.Work. Implement the run method. Implementation of “Task” In the process method, update the cache item based on the key passed into the process method. In the run method, do the following:
  • Get a reference to the named cache
  • Do the Work – Get a reference to the Cache Item; change the cache item; put the cache item back into the named cache.
Completion Notification When the NamedCache.invoke method completes then all the entry processors have completed executing. When a task is submitted for execution it executes asynchronously on the work manager threads in the cluster.  Status may be obtained by registering a commonj.work.WorkListener class when calling the WorkManager.schedule method.  This will provide updates when the Work is accepted, started and completed or rejected.  Alternatively the WorkManager.waitForAll and WorkManager.waitForAny methods allow blocking waits for either all or one result respectively. Returned Results java.lang.Object – when executed on one cache item. This returns result of the invocation as returned from the EntryProcessor.
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys. commonj.work.WorkItem - There are three possible outcomes
  • The Work is not yet complete. In this case, a null is returned by WorkItem.getResult.
  • The Work started but completed with an exception. This may have happened due to a Work Manager Instance terminating abruptly. This is indicated by an exception thrown by WorkItem.getResult.
  • The Work Manager instance indicated that the Work is complete and the Work ran to completion. In this case, WorkItem.getResult returns a non-null and no exception is thrown by WorkItem.getResult.
Error Handling Failure of a node results in all the work assigned to that node being executed on the new primary. This may result in some work being executed twice, but Coherence ensures that the cache is only updated once per item. Failure of a node results in the loss of scheduled tasks assigned to that node. Completed tasks are sent back to the client as they complete. Fault Handling Extension

Entry processors have excellent error handling within Coherence.  Work Managers less so.  In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.

A JDeveloper project with the RetryWorkManager is available for download here.  It includes sample code to run a simple task across multiple work manager threads.

To create a new RetryWorkManager that will retry failed work twice then you would use this: WorkManager = new RetryWorkManager("WorkManagerName", 2);  // Change for number of retries, if no retry count is provided then the default is 0.You can control the number of retries at the individual work level as shown below: WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retriesCurrently the RetryWorkManager defaults to having 0 threads.  To change use this constructor: WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence. How the RetryWorkManager Works

The RetryWorkManager delegates most operations to a Coherence WorkManager instance.  It creates a WorkManagerListener to intercept status updates.  On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error.  If an error occurred and there are retries left then the work is resubmitted.  The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem.  This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried.  If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener.  Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.

Sample Code for EntryProcessor and RetryWorkManager

The downloadable project contains sample code for running the work manager and an entry processor.

The demo implements a 3-tier architecture

  1. Coherence Cache Servers
    • Can be started by running RunCacheServer.cmd
    • Runs a distributed cache used by the Task to be executed in the grid
  2. Coherence Work Manager Servers
    • Can be started by running RunWorkManagerServer.cmd
    • Takes no parameters
    • Runs two threads for executing tasks
  3. Coherence Work Manager Clients
    • Can be started by running RunWorkManagerClient.cmd
    • Takes three parameters currently
      • Work Manager name - should be "AntonyWork" - default is "AntonyWork"
      • Number of tasks to schedule - default is 10
      • Time to wait for tasks to complete in seconds - default is 60

The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing.  The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.

The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.

Summary

If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor.  The entry processor is fault tolerant and updates to the cached entity will be performed once only.

If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution.  The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.

The RetryWorkManager can be downloaded here.

Packt Publishing Buy One Get One Free Offer

Thu, 2014-03-20 13:35
Packt Publishing celebrates their 2000th title with a Buy One Get One Free Offer

Great time to get those Packt books you’ve been thinking of buying, like the SOA Suite 11g Developers Guide or the SOA Suite 11g Developers Cookbook.

The Impact of Change

Sun, 2014-03-09 15:01
Measuring Impact of Change in SOA Suite Mormon prophet Thomas S. Monson once said:

When performance is measured, performance improves. When performance is measured and reported, the rate of performance accelerates.

(LDS Conference Report, October 1970, p107)

Like everything in life, a SOA Suite installation that is monitored and tracked has a much better chance of performing well than one that is not measured.  With that in mind I came up with tool to allow the measurement of the impact of configuration changes on database usage in SOA Suite.  This tool can be used to assess the impact of different configurations on both database growth and database performance, helping to decide which optimizations offer real benefit to the composite under test.

Basic Approach

The basic approach of the tool is to take a snapshot of the number of rows in the SOA tables before executing a composite.  The composite is then executed.  After the composite has completed another snapshot is taken of the SOA tables.  This is illustrated in the diagram below:

An example of the data collected by the tool is shown below:

Test Name Total Tables Changed Total Rows Added Notes AsyncTest1 13 15 Async Interaction with simple SOA composite, one retry to send response. AsyncTest2 12 13 Async interaction with simple SOA composite, no retries on sending response. AsyncTest3 12 13 Async interaction with simple SOA composite, no callback address provided. OneWayTest1 12 13 One-Way interaction with simple SOA composite. SyncTest1 7 7 Sync interaction with simple SOA composite.

Note that the first three columns are provided by the tool, the fourth column is just an aide-memoir to identify what the test name actually did. The tool also allows us to drill into the data to get a better look at what is actually changing as shown in the table below:

Test Name Table Name Rows Added AsyncTest1 AUDIT_COUNTER 1 AsyncTest1 AUDIT_DETAILS 1 AsyncTest1 AUDIT_TRAIL 2 AsyncTest1 COMPOSITE_INSTANCE 1 AsyncTest1 CUBE_INSTANCE 1 AsyncTest1 CUBE_SCOPE 1 AsyncTest1 DLV_MESSAGE 1 AsyncTest1 DOCUMENT_CI_REF 1 AsyncTest1 DOCUMENT_DLV_MSG_REF 1 AsyncTest1 HEADERS_PROPERTIES 1 AsyncTest1 INSTANCE_PAYLOAD 1 AsyncTest1 WORK_ITEM 1 AsyncTest1 XML_DOCUMENT 2

Here we have drilled into the test case with the retry of the callback to see what tables are actually being written to.

Finally we can compare two tests to see difference in the number of rows written and the tables updated as shown below:

Test Name Base Test Name Table Name Row Difference AsyncTest1 AsyncTest2 AUDIT_TRAIL 1

Here are the additional tables referenced by this test

Test Name Base Test Name Additional Table Name Rows Added AsyncTest1 AsyncTest2 WORK_ROWS 1 How it Works

I created a database stored procedure, soa_snapshot.take_soa_snaphot(test_name, phase). that queries all the SOA tables and records the number of rows in each table.  By running the stored procedure before and after the execution of a composite we can capture the number of rows in the SOA database before and after a composite executes.  I then created a view that shows the difference in the number of rows before and after composite execution.  This view has a number of sub-views that allow us to query specific items.  The schema is shown below:

The different tables and views are:

  • CHANGE_TABLE
    • Used to track number of rows in SOA schema, each test case has two or more phases.  Usually phase 1 is before execution and phase 2 is after execution.
    • This only used by the stored procedure and the views.
  • DELTA_VIEW
    • Used to track changes in number of rows in SOA database between phases of a test case.  This is a view on CHANGE_TABLE.  All other views are based off this view.
  • SIMPLE_DELTA_VIEW
    • Provides number of rows changed in each table.
  • SUMMARY_DELTA_VIEW
    • Provides a summary of total rows and tables changed.
  • DIFFERENT_ROWS_VIEW
    • Provides a summary of differences in rows updated between test cases
  • EXTRA_TABLES_VIEW
    • Provides a summary of the extra tables and rows used by a test case.
    • This view makes use of a session context, soa_ctx, which holds the test case name and the baseline test case name.  This context is initialized by calling the stored procedure soa_ctx_pkg.set(testCase, baseTestCase).

I created a web service wrapper to the take_soa_snapshot procedure so that I could use SoapUI to perform the tests.

Sample Output How many rows and tables did a particular test use?

Here we can see how many rows in how many tables changed as a result of running a test:

-- Display the total number of rows and tables changed for each test
select * from summary_delta_view
order by test_name;

TEST_NAME            TOTALDELTAROWS TOTALDELTASIZE TOTALTABLES
-------------------- -------------- -------------- -----------
AsyncTest1                   15              0          13
AsyncTest1noCCIS             15              0          13
AsyncTest1off                 8              0           8
AsyncTest1prod               13              0          12
AsyncTest2                   13              0          12
AsyncTest2noCCIS             13              0          12
AsyncTest2off                 7              0           7
AsyncTest2prod               11              0          11
AsyncTest3                   13              0          12
AsyncTest3noCCIS             13          65536          12
AsyncTest3off                 7              0           7
AsyncTest3prod               11              0          11
OneWayTest1                  13              0          12
OneWayTest1noCCI             13          65536          12
OneWayTest1off                7              0           7
OneWayTest1prod              11              0          11
SyncTest1                     7              0           7
SyncTest1noCCIS               7              0           7
SyncTest1off                  2              0           2
SyncTest1prod                 5              0           5

20 rows selected

Which tables grew during a test?

Here for a given test we can see which tables had rows inserted.

-- Display the tables which grew and show the number of rows they grew by
select * from simple_delta_view
where test_name='AsyncTest1'
order by table_name;
TEST_NAME            TABLE_NAME                      DELTAROWS  DELTASIZE
-------------------- ------------------------------ ---------- ----------
AsyncTest1       AUDIT_COUNTER                           1          0
AsyncTest1       AUDIT_DETAILS                           1          0
AsyncTest1       AUDIT_TRAIL                             2          0
AsyncTest1       COMPOSITE_INSTANCE                      1          0
AsyncTest1       CUBE_INSTANCE                           1          0
AsyncTest1       CUBE_SCOPE                              1          0
AsyncTest1       DLV_MESSAGE                             1          0
AsyncTest1       DOCUMENT_CI_REF                         1          0
AsyncTest1       DOCUMENT_DLV_MSG_REF                    1          0
AsyncTest1       HEADERS_PROPERTIES                      1          0
AsyncTest1       INSTANCE_PAYLOAD                        1          0
AsyncTest1       WORK_ITEM                               1          0
AsyncTest1       XML_DOCUMENT                            2          0
13 rows selected

Which tables grew more in test1 than in test2?

Here we can see the differences in rows for two tests.

-- Return difference in rows updated (test1)
select * from different_rows_view
where test1='AsyncTest1' and test2='AsyncTest2';

TEST1                TEST2                TABLE_NAME                          DELTA
-------------------- -------------------- ------------------------------ ----------
AsyncTest1       AsyncTest2       AUDIT_TRAIL                             1

Which tables were used by test1 but not by test2?

Here we can see tables that were used by one test but not by the other test.

-- Register base test case for use in extra_tables_view
-- First parameter (test1) is test we expect to have extra rows/tables
begin soa_ctx_pkg.set('AsyncTest1', 'AsyncTest2'); end;
/
anonymous block completed
-- Return additional tables used by test1
column TEST2 FORMAT A20
select * from extra_tables_view;
TEST1                TEST2                TABLE_NAME                      DELTAROWS
-------------------- -------------------- ------------------------------ ----------
AsyncTest1       AsyncTest2       WORK_ITEM                               1

 

Results

I used the tool to find out the following.  All tests were run using SOA Suite 11.1.1.7.

The following is based on a very simple composite as shown below:

Each BPEL process is basically the same as the one shown below:

Impact of Fault Policy Retry Being Executed Once Setting Total Rows Written Total Tables Updated No Retry 13 12 One Retry 15 13

When a fault policy causes a retry then the following additional database rows are written:

Table Name Number of Rows AUDIT_TRAIL 1 WORK_ITEM 1 Impact of Setting Audit Level = Development Instead of Production Setting Total Rows Written Total Tables Updated Development 13 12 Production 11 11

When the audit level is set at development instead of production then the following additional database rows are written:

Table Name Number of Rows AUDIT_TRAIL 1 WORK_ITEM 1 Impact of Setting Audit Level = Production Instead of Off Setting Total Rows Written Total Tables Updated Production 11 11 Off 7 7

When the audit level is set at production rather than off then the following additional database rows are written:

Table Name Number of Rows AUDIT_COUNTER 1 AUDIT_DETAILS 1 AUDIT_TRAIL 1 COMPOSITE_INSTANCE 1 Impact of Setting Capture Composite Instance State Setting Total Rows Written Total Tables Updated On 13 12 Off 13 12

When capture composite instance state is on rather than off then no additional database rows are written, note that there are other activities that occur when composite instance state is captured:

Impact of Setting oneWayDeliveryPolicy = async.cache or sync Setting Total Rows Written Total Tables Updated async.persist 13 12 async.cache 7 7 sync 7 7

When choosing async.persist (the default) instead of sync or async.cache then the following additional database rows are written:

Table Name Number of Rows AUDIT_DETAILS 1 DLV_MESSAGE 1 DOCUMENT_CI_REF 1 DOCUMENT_DLV_MSG_REF 1 HEADERS_PROPERTIES 1 XML_DOCUMENT 1

As you would expect the sync mode behaves just as a regular synchronous (request/reply) interaction and creates the same number of rows in the database.  The async.cache also creates the same number of rows as a sync interaction because it stores state in memory and provides no restart guarantee.

Caveats & Warnings

The results above are based on a trivial test case.  The numbers will be different for bigger and more complex composites.  However by taking snapshots of different configurations you can produce the numbers that apply to your composites.

The capture procedure supports multiple steps in a test case, but the views only support two snapshots per test case.

Code Download

The sample project I used us available here.

The scripts used to create the user (createUser.sql), create the schema (createSchema.sql) and sample queries (TableCardinality.sql) are available here.

The Web Service wrapper to the capture state stored procedure is available here.

The sample SoapUI project that I used to take a snapshot, perform the test and take a second snapshot is available here.

Clustering Events

Wed, 2014-02-26 13:25
Setting up an Oracle Event Processing Cluster

Recently I was working with Oracle Event Processing (OEP) and needed to set it up as part  of a high availability cluster.  OEP uses Coherence for quorum membership in an OEP cluster.  Because the solution used caching it was also necessary to include access to external Coherence nodes.  Input messages need to be duplicated across multiple OEP streams and so a JMS Topic adapter needed to be configured.  Finally only one copy of each output event was desired, requiring the use of an HA adapter.  In this blog post I will go through the steps required to implement a true HA OEP cluster.

OEP High Availability Review

The diagram below shows a very simple non-HA OEP configuration:

Events are received from a source (JMS in this blog).  The events are processed by an event processing network which makes use of a cache (Coherence in this blog).  Finally any output events are emitted.  The output events could go to any destination but in this blog we will emit them to a JMS queue.

OEP provides high availability by having multiple event processing instances processing the same event stream in an OEP cluster.  One instance acts as the primary and the other instances act as secondary processors.  Usually only the primary will output events as shown in the diagram below (top stream is the primary):

The actual event processing is the same as in the previous non-HA example.  What is different is how input and output events are handled.  Because we want to minimize or avoid duplicate events we have added an HA output adapter to the event processing network.  This adapter acts as a filter, so that only the primary stream will emit events to out queue.  If the processing of events within the network depends on how the time at which events are received then it is necessary to synchronize the event arrival time across the cluster by using an HA input adapter to synchronize the arrival timestamps of events across the cluster.

OEP Cluster Creation

Lets begin by setting up the base OEP cluster.  To do this we create new OEP configurations on each machine in the cluster.  The steps are outlined below.  Note that the same steps are performed on each machine for each server which will run on that machine:

  • Run ${MW_HOME}/ocep_11.1/common/bin/config.sh.
    • MW_HOME is the installation directory, note that multiple Fusion Middleware products may be installed in this directory.
  • When prompted “Create a new OEP domain”.
  • Provide administrator credentials.
    • Make sure you provide the same credentials on all machines in the cluster.
  • Specify a  “Server name” and “Server listen port”.
    • Each OEP server must have a unique name.
    • Different servers can share the same “Server listen port” unless they are running on the same host.
  • Provide keystore credentials.
    • Make sure you provide the same credentials on all machines in the cluster.
  • Configure any required JDBC data source.
  • Provide the “Domain Name” and “Domain location”.
    • All servers must have the same “Domain name”.
    • The “Domain location” may be different on each server, but I would keep it the same to simplify administration.
    • Multiple servers on the same machine can share the “Domain location” because their configuration will be placed in the directory corresponding to their server name.
  • Create domain!
Configuring an OEP Cluster

Now that we have created our servers we need to configure them so that they can find each other.  OEP uses Oracle Coherence to determine cluster membership.  Coherence clusters can use either multicast or unicast to discover already running members of a cluster.  Multicast has the advantage that it is easy to set up and scales better (see http://www.ateam-oracle.com/using-wka-in-large-coherence-clusters-disabling-multicast/) but has a number of challenges, including failure to propagate by default through routers and accidently joining the wrong cluster because someone else chose the same multicast settings.  We will show how to use both unicast and multicast to discover the cluster. 

Multicast Discovery Unicast Discovery Coherence multicast uses a class D multicast address that is shared by all servers in the cluster.  On startup a Coherence node broadcasts a message to the multicast address looking for an existing cluster.  If no-one responds then the node will start the cluster. Coherence unicast uses Well Known Addresses (WKAs). Each server in the cluster needs a dedicated listen address/port combination. A subset of these addresses are configured as WKAs and shared between all members of the cluster. As long as at least one of the WKAs is up and running then servers can join the cluster. If a server does not find any cluster members then it checks to see if its listen address and port are in the WKA list. If it is then that server will start the cluster, otherwise it will wait for a WKA server to become available.   To configure a cluster the same steps need to be followed for each server in the cluster:
  • Set an event server address in the config.xml file.
    • Add the following to the <cluster> element:
      <cluster>
          <server-name>server1</server-name>
          <server-host-name>oep1.oracle.com</server-host-name>
      </cluster>
    • The “server-name” is displayed in the visualizer and should be unique to the server.

    • The “server-host-name” is used by the visualizer to access remote servers.

    • The “server-host-name” must be an IP address or it must resolve to an IP address that is accessible from all other servers in the cluster.

    • The listening port is configured in the <netio> section of the config.xml.

    • The server-host-name/listening port combination should be unique to each server.

 
  • Set a common cluster multicast listen address shared by all servers in the config.xml file.
    • Add the following to the <cluster> element:
      <cluster>
          …
          <!—For us in Coherence multicast only! –>
          <multicast-address>239.255.200.200</multicast-address>
          <multicast-port>9200</multicast-port>
      </cluster>
    • The “multicast-address” must be able to be routed through any routers between servers in the cluster.

  • Optionally you can specify the bind address of the server, this allows you to control port usage and determine which network is used by Coherence

    • Create a “tangosol-coherence-override.xml” file in the ${DOMAIN}/{SERVERNAME}/config directory for each server in the cluster.
      <?xml version='1.0'?>
      <coherence>
          <cluster-config>
              <unicast-listener>
                  <!—This server Coherence address and port number –>
                  <address>192.168.56.91</address>
                  <port>9200</port>
              </unicast-listener>
          </cluster-config>
      </coherence>
  • Configure the Coherence WKA cluster discovery.

    • Create a “tangosol-coherence-override.xml” file in the ${DOMAIN}/{SERVERNAME}/config directory for each server in the cluster.
      <?xml version='1.0'?>
      <coherence>
          <cluster-config>
              <unicast-listener>
                  <!—WKA Configuration –>
                  <well-known-addresses>
                      <socket-address id="1">
                          <address>192.168.56.91</address>
                          <port>9200</port>
                      </socket-address>
                      <socket-address id="2">
                          <address>192.168.56.92</address>
                          <port>9200</port>
                      </socket-address>
                  </well-known-addresses>
                  <!—This server Coherence address and port number –>
                  <address>192.168.56.91</address>
                  <port>9200</port>
              </unicast-listener>
          </cluster-config>
      </coherence>

    • List at least two servers in the <socket-address> elements.

    • For each <socket-address> element there should be a server that has corresponding <address> and <port> elements directly under <well-known-addresses>.

    • One of the servers listed in the <well-known-addresses> element must be the first server started.

    • Not all servers need to be listed in <well-known-addresses>, but see previous point.

 
  • Enable clustering using a Coherence cluster.
    • Add the following to the <cluster> element in config.xml.
      <cluster>
          …
          <enabled>true</enabled>
      </cluster>
    • The “enabled” element tells OEP that it will be using Coherence to establish cluster membership, this can also be achieved by setting the value to be “coherence”.

 
  • The following shows the <cluster> config for another server in the cluster with differences highlighted:
    <cluster>
        <server-name>server2</server-name>
        <server-host-name>oep2.oracle.com</server-host-name>
        <!—For us in Coherence multicast only! –>
        <multicast-address>239.255.200.200</multicast-address>
        <multicast-port>9200</multicast-port>
        <enabled>true</enabled>
    </cluster>

  • The following shows the <cluster> config for another server in the cluster with differences highlighted:
    <cluster>
        <server-name>server2</server-name>
        <server-host-name>oep2.oracle.com</server-host-name>
        <enabled>true</enabled>
    </cluster>

 
  • The following shows the “tangosol-coherence-override.xml” file for another server in the cluster with differences highlighted:
    <?xml version='1.0'?>
    <coherence>
        <cluster-config>
            <unicast-listener>
                <!—WKA Configuration –>
                <well-known-addresses>
                    <socket-address id="1">
                        <address>192.168.56.91</address>
                        <port>9200</port>
                    </socket-address>
                    <socket-address id="2">
                        <address>192.168.56.92</address>
                        <port>9200</port>
                    </socket-address>
                    <!—This server Coherence address and port number –>
                    <address>192.168.56.92</address>
                    <port>9200</port>
                </well-known-addresses>
            </unicast-listener>
        </cluster-config>
    </coherence>

You should now have a working OEP cluster.  Check the cluster by starting all the servers.

Look for a message like the following on the first server to start to indicate that another server has joined the cluster:

<Coherence> <BEA-2049108> <The domain membership has changed to [server2, server1], the new domain primary is "server1">

Log on to the Event Processing Visualizer of one of the servers – http://<hostname>:<port>/wlevs.  Select the cluster name on the left and then select group “AllDomainMembers”.  You should see a list of all the running servers in the “Servers of Group – AllDomainMembers” section.

Sample Application

Now that we have a working OEP cluster let us look at a simple application that can be used as an example of how to cluster enable an application.  This application models service request tracking for hardware products.  The application we will use performs the following checks:

  1. If a new service request (identified by SRID) arrives (indicated by status=RAISE) then we expect some sort of follow up in the next 10 seconds (seconds because I want to test this quickly).  If no follow up is seen then an alert should be raised.
    • For example if I receive an event (SRID=1, status=RAISE) and after 10 seconds I have not received a follow up message (SRID=1, status<>RAISE) then I need to raise an alert.
  2. If a service request (identified by SRID) arrives and there has been another service request (identified by a different SRID) for the same physcial hardware (identified by TAG) then an alert should be raised.
    • For example if I receive an event (SRID=2, TAG=M1) and later I receive another event for the same hardware (SRID=3, TAG=M1) then an alert should be raised.

Note use case 1 is nicely time bounded – in this case the time window is 10 seconds.  Hence this is an ideal candidate to be implemented entirely in CQL.

Use case 2 has no time constraints, hence over time there could be a very large number of CQL queries running looking for a matching TAG but a different SRID.  In this case it is better to put the TAGs into a cache and search the cache for duplicate tags.  This reduces the amount of state information held in the OEP engine.

The sample application to implement this is shown below:

Messages are received from a JMS Topic (InboundTopicAdapter).  Test messages can be injected via a CSV adapter (RequestEventCSVAdapter).  Alerts are sent to a JMS Queue (OutboundQueueAdapter), and also printed to the server standard output (PrintBean).  Use case 1 is implemented by the MissingEventProcessor.  Use case 2 is implemented by inserting the TAG into a cache (InsertServiceTagCacheBean) using a Coherence event processor and then querying the cache for each new service request (DuplicateTagProcessor), if the same tag is already associated with an SR in the cache then an alert is raised.  The RaiseEventFilter is used to filter out existing service requests from the use case 2 stream.

The non-HA version of the application is available to download here.

We will use this application to demonstrate how to HA enable an application for deployment on our cluster.

A CSV file (TestData.csv) and Load generator properties file (HADemoTest.prop) is provided to test the application by injecting events using the CSV Adapter.

Note that the application reads a configuration file (System.properties) which should be placed in the domain directory of each event server.

Deploying an Application

Before deploying an application to a cluster it is a good idea to create a group in the cluster.  Multiple servers can be members of this group.  To add a group to an event server just add an entry to the <cluster> element in config.xml as shown below:

<cluster>
      …
      <groups>HAGroup</groups>
   </cluster>

Multiple servers can be members of a group and a server can be a member of multiple groups.  This allows you to have different levels of high availability in the same event processing cluster.

Deploy the application using the Visualizer.  Target the application at the group you created, or the AllDomainMembers group.

Test the application, typically using a CSV Adapter.  Note that using a CSV adapter sends all the events to a single event server.  To fix this we need to add a JMS output adapter (OutboundTopicAdapter) to our application and then send events from the CSV adapter to the outbound JMS adapter as shown below:

So now we are able to send events via CSV to an event processor that in turn sends the events to a JMS topic.  But we still have a few challenges.

Managing Input

First challenge is managing input.  Because OEP relies on the same event stream being processed by multiple servers we need to make sure that all our servers get the same message from the JMS Topic.  To do this we configure the JMS connection factory to have an Unrestricted Client ID.  This allows multiple clients (OEP servers in our case) to use the same connection factory.  Client IDs are mandatory when using durable topic subscriptions.  We also need each event server to have its own subscriber ID for the JMS Topic, this ensures that each server will get a copy of all the messages posted to the topic.  If we use the same subscriber ID for all the servers then the messages will be distributed across the servers, with each server seeing a completely disjoint set of messages to the other servers in the cluster.  This is not what we want because each server should see the same event stream.  We can use the server name as the subscriber ID as shown in the below excerpt from our application:

<wlevs:adapter id="InboundTopicAdapter" provider="jms-inbound">
    …
    <wlevs:instance-property name="durableSubscriptionName"
            value="${com_bea_wlevs_configuration_server_ClusterType.serverName}" />
</wlevs:adapter>

This works because I have placed a ConfigurationPropertyPlaceholderConfigurer bean in my application as shown below, this same bean is also used to access properties from a configuration file:

<bean id="ConfigBean"
        class="com.bea.wlevs.spring.support.ConfigurationPropertyPlaceholderConfigurer">
        <property name="location" value="file:../Server.properties"/>
    </bean>

With this configuration each server will now get a copy of all the events.

As our application relies on elapsed time we should make sure that the timestamps of the received messages are the same on all servers.  We do this by adding an HA Input adapter to our application.

<wlevs:adapter id="HAInputAdapter" provider="ha-inbound">
    <wlevs:listener ref="RequestChannel" />
    <wlevs:instance-property name="keyProperties"
            value="EVID" />
    <wlevs:instance-property name="timeProperty" value="arrivalTime"/>
</wlevs:adapter>

The HA Adapter sets the given “timeProperty” in the input message to be the current system time.  This time is then communicated to other HAInputAdapters deployed to the same group.  This allows all servers in the group to have the same timestamp in their event.  The event is identified by the “keyProperties” key field.

To allow the downstream processing to treat the timestamp as an arrival time then the downstream channel is configured with an “application-timestamped” element to set the arrival time of the event.  This is shown below:

<wlevs:channel id="RequestChannel" event-type="ServiceRequestEvent">
    <wlevs:listener ref="MissingEventProcessor" />
    <wlevs:listener ref="RaiseEventFilterProcessor" />
    <wlevs:application-timestamped>
        <wlevs:expression>arrivalTime</wlevs:expression>
    </wlevs:application-timestamped>
</wlevs:channel>

Note the property set in the HAInputAdapter is used to set the arrival time of the event.

So now all servers in our cluster have the same events arriving from a topic, and each event arrival time is synchronized across the servers in the cluster.

Managing Output

Note that an OEP cluster has multiple servers processing the same input stream.  Obviously if we have the same inputs, synchronized to appear to arrive at the same time then we will get the same outputs, which is central to OEPs promise of high availability.  So when an alert is raised by our application it will be raised by every server in the cluster.  If we have 3 servers in the cluster then we will get 3 copies of the same alert appearing on our alert queue.  This is probably not what we want.  To fix this we take advantage of an HA Output Adapter.  unlike input where there is a single HA Input Adapter there are multiple HA Output Adapters, each with distinct performance and behavioral characteristics.  The table below is taken from the Oracle® Fusion Middleware Developer's Guide for Oracle Event Processing and shows the different levels of service and performance impact:

Table 24-1 Oracle Event Processing High Availability Quality of Service High Availability Option Missed Events? Duplicate Events? Performance Overhead Section 24.1.2.1, "Simple Failover" Yes (many) Yes (few) Negligible Section 24.1.2.2, "Simple Failover with Buffering" Yes (few)Foot 1 Yes (many) Low Section 24.1.2.3, "Light-Weight Queue Trimming" No Yes (few) Low-MediumFoot 2 Section 24.1.2.4, "Precise Recovery with JMS" No No High

I decided to go for the lightweight queue trimming option.  This means I won’t lose any events, but I may emit a few duplicate events in the event of primary failure.  This setting causes all output events to be buffered by secondary's until they are told by the primary that a particular event has been emitted.  To configure this option I add the following adapter to my EPN:

    <wlevs:adapter id="HAOutputAdapter" provider="ha-broadcast">
        <wlevs:listener ref="OutboundQueueAdapter" />
        <wlevs:listener ref="PrintBean" />
        <wlevs:instance-property name="keyProperties" value="timestamp"/>
        <wlevs:instance-property name="monotonic" value="true"/>
        <wlevs:instance-property name="totalOrder" value="false"/>
    </wlevs:adapter>

This uses the time of the alert (timestamp property) as the key to be used to identify events which have been trimmed.  This works in this application because the alert time is the time of the source event, and the time of the source events are synchronized using the HA Input Adapter.  Because this is a time value then it will increase, and so I set monotonic=”true”.  However I may get two alerts raised at the same timestamp and in that case I set totalOrder=”false”.

I also added the additional configuration to config.xml for the application:

<ha:ha-broadcast-adapter>
    <name>HAOutputAdapter</name>
    <warm-up-window-length units="seconds">15</warm-up-window-length>
    <trimming-interval units="millis">1000</trimming-interval>
</ha:ha-broadcast-adapter>

This causes the primary to tell the secondary's which is its latest emitted alert every 1 second.  This will cause the secondary's to trim from their buffers all alerts prior to and including the latest emitted alerts.  So in the worst case I will get one second of duplicated alerts.  It is also possible to set a number of events rather than a time period.  The trade off here is that I can reduce synchronization overhead by having longer time intervals or more events, causing more memory to be used by the secondary's or I can cause more frequent synchronization, using less memory in the secondary's and generating fewer duplicate alerts but there will be more communication between the primary and the secondary's to trim the buffer.

The warm-up window is used to stop a secondary joining the cluster before it has been running for that time period.  The window is based on the time that the EPN needs to be running to be have the same state as the other servers.  In our example application we have a CQL that runs for a period of 10 seconds, so I set the warm up window to be 15 seconds to ensure that a newly started server had the same state as all the other servers in the cluster.  The warm up window should be greater than the longest query window.

Adding an External Coherence Cluster

When we are running OEP as a cluster then we have additional overhead in the servers.  The HA Input Adapter is synchronizing event time across the servers, the HA Output adapter is synchronizing output events across the servers.  The HA Output adapter is also buffering output events in the secondary’s.  We can’t do anything about this but we can move the Coherence Cache we are using outside of the OEP servers, reducing the memory pressure on those servers and also moving some of the processing outside of the server.  Making our Coherence caches external to our OEP cluster is a good idea for the following reasons:

  • Allows moving storage of cache entries outside of the OEP server JVMs hence freeing more memory for storing CQL state.
  • Allows storage of more entries in the cache by scaling cache independently of the OEP cluster.
  • Moves cache processing outside OEP servers.

To create the external Coherence cache do the following:

  • Create a new directory for our standalone Coherence servers, perhaps at the same level as the OEP domain directory.
  • Copy the tangosol-coherence-override.xml file previously created for the OEP cluster into a config directory under the Coherence directory created in the previous step.
  • Copy the coherence-cache-config.xml file from the application into a config directory under the Coherence directory created in the previous step.
  • Add the following to the tangosol-coherence-override.xml file in the Coherence config directory:
    • <coherence>
          <cluster-config>
              <member-identity>
                  <cluster-name>oep_cluster</cluster-name>
                  <member-name>Grid1</member-name>
              </member-identity>
              …
          </cluster-config>
      </coherence>
    • Important Note: The <cluster-name> must match the name of the OEP cluster as defined in the <domain><name> element in the event servers config.xml.
    • The member name is used to help identify the server.
  • Disable storage for our caches in the event servers by editing the coherence-cache-config.xml file in the application and adding the following element to the caches:
    • <distributed-scheme>
          <scheme-name>DistributedCacheType</scheme-name>
          <service-name>DistributedCache</service-name>
          <backing-map-scheme>
              <local-scheme/>
          </backing-map-scheme>
          <local-storage>false</local-storage>
      </distributed-scheme>
    • The local-storage flag stops the OEP server from storing entries for caches using this cache schema.
    • Do not disable storage at the global level (-Dtangosol.coherence.distributed.localstorage=false) because this will disable storage on some OEP specific cache schemes as well as our application cache.  We don’t want to put those schemes into our cache servers because they are used by OEP to maintain cluster integrity and have only one entry per application per server, so are very small.  If we put those into our Coherence Cache servers we would have to add OEP specific libraries to our cache servers and enable them in our coherence-cache-config.xml, all of which is too much trouble for little or no benefit.
  • If using Unicast Discovery (this section is not required if using Multicast) then we want to make the Coherence Grid be the Well Known Address servers because we want to disable storage of entries on our OEP servers, and Coherence nodes with storage disabled cannot initialize a cluster.  To enable the Coherence servers to be primaries in the Coherence grid do the following:
    • Change the unicast-listener addresses in the Coherence servers tangosol-coherence-override.xml file to be suitable values for the machine they are running on – typically change the listen address.
    • Modify the WKA addresses in the OEP servers and the Coherence servers tangosol-coherence-override.xml file to match at least two of the Coherence servers listen addresses.
    • The following table shows how this might be configured for 2 OEP servers and 2 Cache servers
      OEP Server 1 OEP Server 2 Cache Server 1 Cache Server 2

      <?xml version='1.0'?>
      <coherence>
        <cluster-config>








          <unicast-listener>
            <well-known-addresses>
              <socket-address id="1">
                <address>
                  192.168.56.91
               
      </address>
                <port>9300</port>
              </socket-address>
              <socket-address id="2">
                <address>
                  192.168.56.92
               
      </address>
                <port>9300</port>
              </socket-address>
            </well-known-addresses>
            <address>
              192.168.56.91
           
      </address>
            <port>9200</port>
          </unicast-listener>
        </cluster-config>
      </coherence>

      <?xml version='1.0'?>
      <coherence>
        <cluster-config>








          <unicast-listener>
            <well-known-addresses>
              <socket-address id="1">
                <address>
                  192.168.56.91
               
      </address>
                <port>9300</port>
              </socket-address>
              <socket-address id="2">
                <address>
                  192.168.56.92
               
      </address>
                <port>9300</port>
              </socket-address>
            </well-known-addresses>
            <address>
              192.168.56.92
           
      </address>
            <port>9200</port>
          </unicast-listener>
        </cluster-config>
      </coherence>

      <?xml version='1.0'?>
      <coherence>
        <cluster-config>
          <member-identity>
            <cluster-name>
              oep_cluster
            </cluster-name>
            <member-name>
              Grid1
            </member-name>
          </member-identity>
          <unicast-listener>
            <well-known-addresses>
              <socket-address id="1">
                <address>
                  192.168.56.91
               
      </address>
                <port>9300</port>
              </socket-address>
              <socket-address id="2">
                <address>
                  192.168.56.92
               
      </address>
                <port>9300</port>
              </socket-address>
            </well-known-addresses>
            <address>
              192.168.56.91
           
      </address>
            <port>9300</port>
          </unicast-listener>
        </cluster-config>
      </coherence>

      <?xml version='1.0'?>
      <coherence>
        <cluster-config>
          <member-identity>
            <cluster-name>
              oep_cluster
            </cluster-name>
            <member-name>
              Grid2
            </member-name>
          </member-identity>
          <unicast-listener>
            <well-known-addresses>
              <socket-address id="1">
                <address>
                  192.168.56.91
               
      </address>
                <port>9300</port>
              </socket-address>
              <socket-address id="2">
                <address>
                  192.168.56.92
               
      </address>
                <port>9300</port>
              </socket-address>
            </well-known-addresses>
            <address>
              192.168.56.92
           
      </address>
            <port>9300</port>
          </unicast-listener>
        </cluster-config>
      </coherence>

    • Note that the OEP servers do not listen on the WKA addresses, using different port numbers even though they run on the same servers as the cache servers.
    • Also not that the Coherence servers are the ones that listen on the WKA addresses.
  • Now that the configuration is complete we can create a start script for the Coherence grid servers as follows:
    • #!/bin/sh
      MW_HOME=/home/oracle/fmw
      OEP_HOME=${MW_HOME}/ocep_11.1
      JAVA_HOME=${MW_HOME}/jrockit_160_33
      CACHE_SERVER_HOME=${MW_HOME}/user_projects/domains/oep_coherence
      CACHE_SERVER_CLASSPATH=${CACHE_SERVER_HOME}/HADemoCoherence.jar:${CACHE_SERVER_HOME}/config
      COHERENCE_JAR=${OEP_HOME}/modules/com.tangosol.coherence_3.7.1.6.jar
      JAVAEXEC=$JAVA_HOME/bin/java
      # specify the JVM heap size
      MEMORY=512m
      if [[ $1 == '-jmx' ]]; then
          JMXPROPERTIES="-Dcom.sun.management.jmxremote -Dtangosol.coherence.management=all -Dtangosol.coherence.management.remote=true"
          shift
      fi
      JAVA_OPTS="-Xms$MEMORY -Xmx$MEMORY $JMXPROPERTIES"
      $JAVAEXEC -server -showversion $JAVA_OPTS -cp "${CACHE_SERVER_CLASSPATH}:${COHERENCE_JAR}" com.tangosol.net.DefaultCacheServer $1
    • Note that I put the tangosol-coherence-override and the coherence-cache-config.xml files in a config directory and added that directory to my path (CACHE_SERVER_CLASSPATH=${CACHE_SERVER_HOME}/HADemoCoherence.jar:${CACHE_SERVER_HOME}/config) so that Coherence would find the override file.
    • Because my application uses in-cache processing (entry processors) I had to add a jar file containing the required classes for the entry processor to the classpath (CACHE_SERVER_CLASSPATH=${CACHE_SERVER_HOME}/HADemoCoherence.jar:${CACHE_SERVER_HOME}/config).
    • The classpath references the Coherence Jar shipped with OEP to avoid versoin mismatches (COHERENCE_JAR=${OEP_HOME}/modules/com.tangosol.coherence_3.7.1.6.jar).
    • This script is based on the standard cache-server.sh script that ships with standalone Coherence.
    • The –jmx flag can be passed to the script to enable Coherence JMX management beans.

We have now configured Coherence to use an external data grid for its application caches.  When starting we should always start at least one of the grid servers before starting the OEP servers.  This will allow the OEP server to find the grid.  If we do start things in the wrong order then the OEP servers will block waiting for a storage enabled node to start (one of the WKA servers if using Unicast).

Summary

We have now created an OEP cluster that makes use of an external Coherence grid for application caches.  The application has been modified to ensure that the timestamps of arriving events are synchronized and the output events are only output by one of the servers in the cluster.  In event of failure we may get some duplicate events with our configuration (there are configurations that avoid duplicate events) but we will not lose any events.  The final version of the application with full HA capability is shown below:

Files

The following files are available for download:

  • Oracle Event Processing
    • Includes Coherence
  • None-HA version of application
    • Includes test file TestData.csv and Load Test property file HADemoTest.prop
    • Includes Server.properties.Antony file to customize to point to your WLS installation
  • HA version of application
    • Includes test file TestData.csv and Load Test property file HADemoTest.prop
    • Includes Server.properties.Antony file to customize to point to your WLS installation
  • OEP Cluster Files
    • Includes config.xml
    • Includes tangosol-coherence-override.xml
    • Includes Server.properties that will need customizing for your WLS environment
  • Coherence Cluster Files
    • Includes tangosol-coherence-override.xml and coherence-cache-configuration.xml
    • includes cache-server.sh start script
    • Includes HADemoCoherence.jar with required classes for entry processor
References

The following references may be helpful:

Clear Day for Cloud Adapters

Fri, 2014-01-17 16:07
salesforce.com Adapter Released

Yesterday Oracle released their cloud adapter for salesforce.com (SFDC) so I thought I would talk a little about why you might want it.  I had previously integrated with SFDC using BPEL and the SFDC web interface, so in this post I will explore why the adapter might be a better approach.

Why?

So if I can interface to SFDC without the adapter why would I spend money on the adapter?  There are a number of reasons and in this post I will just explain the following 3 benefits:

  • Auto-Login
  • Non-Ploymorphic Operations
  • Operation Wizards

Lets take each one in turn.

Auto-Login

The first obvious benefit is how you connect and make calls to SFDC.  To perform an operation such as query an account or update an address the SFDC interface requires you to do the following:

  • Invoke a login method which returns a session ID to placed in the header on all future calls and the actual endpoint to call.
  • Invoke the actual operation using the provided endpoint and passing the session ID provided.
  • When finished with calls invoke the logout operation.

Now these are not unreasonable demands.  The problem comes when you try to implement this interface.

Before calling the login method you need the credentials.  These need to be obtained from somewhere, I set them as BPEL preferences but there are other ways to store them.  Calling the login method is not a problem but you need to be careful in how you make subsequent calls.

First all subsequent calls must override the endpoint address with the one returned from the login operation.  Secondly the provided session ID must be placed into a custom SOAP header.  So you have to copy the session ID into a custom SOAP Header and provide that header to the invoke operation.  You also have to override the endpointURI property in the invoke with provided endpoint.

Finally when you have finished performing operations you have to logout.

In addition to the number of steps you have to code there is the problem of knowing when to logout.  The simplest thing to do is for each operation you wish to perform execute the login operatoin folloed y the actual working operation and then do a logout operation.  The trouble with this is that you are now making 3 calls every time you want to perform an operation against SFDC.  This causes additional latency in processing the request.

The adapter hides all this from you, hiding the login/logout operations and allowing connections to  be re-used, reducing the number of logins required.  The adapter makes the SFDC call look like a call to any other web service while the adapter uses a session cache to avoid repeated logins.

Non-Polymorphic Operations

The standard operations in the SFDC interface provide a base object return type, the sObject.  This could be an Account or a Campaign for example but the operations always return the base sObject type, leaving it to the client to make sure they process the correct return type.  Similarly requests also use polymorphic data types.  This often requires in BPEL that the sObject returned is copied to a variable of a more specific type to simplify processing of the data.  If you don’t do this then you can still query fields within the specific object but the SOA tooling cannot check it for you.

The adapter identifies the type of request and response and also helps build the request for you with bind parameters.  This means that you are able to build your process to actually work with the real data structures, not the abstract ones.  This is another big benefit in my view!

Operation Wizards

The SFDC API is very powerful.  Translation: the SFDC API is very complex.  With great power comes complexity to paraphrase Uncle Ben (amongst others).  The adapter groups the operations into logical collections and then provides additional help in selecting from within those collections of operations and providing the correct parameters for them.

Installing

Installation takes place in two parts.  The Design time is deployed into JDeveloper and the run time is deployed into the SOA Suite Oracle Home.  The adapter is available for download here and the installation instructions and documentation are here.  Note that you will need OPatch to install the adapter.  OPatch can be downloaded from Oracle Support Patch 6880880.  Don’t use the OPatch that ships with JDeveloper and SOA Suite.  If you do you may see an error like:

Uncaught exception
oracle.classloader.util.AnnotatedNoClassDefFoundError:
      Missing class: oracle.tip.tools.ide.adapters.cloud.wizard.CloudAdapterWizard

You will want the OPatch 11.1.0.x.x.  Make sure you download the correct 6880880, it must be for 11.1.x as that is the version of JDeveloper and SOA Suite and it must be for the platform you are running on.

Apart from getting the right OPatch the installation is very straight forward.

So don’t be afraid of SFDC integration any more, cloud integratoin is clear with the SFDC adapter.

Going Native with JCA Adapters

Mon, 2013-12-30 15:55
Formatting JCA Adapter Binary Contents

Sometimes you just need to go native and play with binary data rather than XML.  This occurs commonly when using JCA adapters, the file to be written is in binary format, or the TCP messsages written by the Socket Adapter are in binary format.  Although the adapter has no problem converting Base64 data into raw binary, it is a little tricky to get that data into base64 format in the first place, so this blog entry will explain how.

Adapter Creation

When creating most adapters (application & DB being the exceptions) you have the option of choosing the message format.  By making the message format “opaque” you are telling the adapter wizard that the message data will be provided as a base-64 encoded string and the adapter will convert this to binary and deliver it.

This results in a WSDL message defined as shown below:

<wsdl:types>
<schema targetNamespace="http://xmlns.oracle.com/pcbpel/adapter/opaque/"
        xmlns="http://www.w3.org/2001/XMLSchema" >
  <element name="opaqueElement" type="base64Binary" />
</schema>
</wsdl:types>
<wsdl:message name="Write_msg">
    <wsdl:part name="opaque" element="opaque:opaqueElement"/>
</wsdl:message>

The Challenge

The challenge now is to convert out data into a base-64 encoded string.  For this we have to turn to the service bus and MFL.

Within the service bus we use the MFL editor to define the format of the binary data.  In our example we will have variable length strings that start with a 1 byte length field as well as 32-bit integers and 64-bit floating point numbers.

The example below shows a sample MFL file to describe the above data structure:

<?xml version='1.0' encoding='windows-1252'?>
<!DOCTYPE MessageFormat SYSTEM 'mfl.dtd'>
<!--   Enter description of the message format here.   -->
<MessageFormat name='BinaryMessageFormat' version='2.02'>
    <FieldFormat name='stringField1' type='String' delimOptional='y' codepage='UTF-8'>
        <LenField type='UTinyInt'/>
    </FieldFormat>
    <FieldFormat name='intField' type='LittleEndian4' delimOptional='y'/>
    <FieldFormat name='doubleField' type='LittleEndianDouble' delimOptional='y'/>
    <FieldFormat name='stringField2' type='String' delimOptional='y' codepage='UTF-8'>
        <LenField type='UTinyInt'/>
    </FieldFormat>
</MessageFormat>

Note that we can define the endianess of the multi-byte numbers, in this case they are specified as little endian (Intel format).

I also created an XML version of the MFL that can be used in interfaces.

The XML version can then be imported into a WSDL document to create a web service.

Full Steam Ahead

We now have all the pieces we need to convert XML to binary and deliver it via an adapter using the process shown below:

  • We receive the XML request, in the sample code, the sample delivers it as a web service.
  • We then convert the request data into MFL format XML using an XQuery and store the result in a variable (mflVar).
  • We then convert the MFL formatted XML into binary data (internally this is held as a java byte array) and store the result in a variable (binVar).
  • We then convert the byte array to a base-64 string using javax.xml.bind.DatatypeConverter.printBase64Binary and store the result in a variable (base64Var).
  • Finally we replace the original $body contents with the output of an XQuery that matches the adapter expected XML format.

The diagram below shows the OSB pipeline that implements the above.

A Wrinkle

Unfortunately we can only call static Java methods that reside in a jar file imported into service bus, so we have to provide a wrapper for the printBase64Binary call.  The below Java code was used to provide this wrapper:

package antony.blog;

import javax.xml.bind.DatatypeConverter;

public class Base64Encoder {
    public static String base64encode(byte[] content) {
        return DatatypeConverter.printBase64Binary(content);
    }
    public static byte[] base64decode(String content) {
        return DatatypeConverter.parseBase64Binary(content);
    }
}

Wrapping Up

Sample code is available here and consists of the following projects:

  • BinaryAdapter – JDeveloper SOA Project that defines the JCA File Adapter
  • OSBUtils – JDeveloper Java Project that defines the Java wrapper for DataTypeConverter
  • BinaryFileWriter – Eclipse OSB Project that includes everything needed to try out the steps in this blog.

The OSB project needs to be customized to have the logical directory name point to something sensible.  The project can be tested using the normal OSB console test screen.

The following sample input (note 16909060 is 0x01020304)

<bin:OutputMessage xmlns:bin="http://www.example.org/BinarySchema">
    <bin:stringField1>First String</bin:stringField1>
    <bin:intField>16909060</bin:intField>
    <bin:doubleField>1.5</bin:doubleField>
    <bin:stringField2>Second String</bin:stringField2>
</bin:OutputMessage>

Generates the following binary data file – displayed using “hexdump –C”.  The int is highlighted in yellow, the double in orange and the strings and their associated lengths in green with the length in bold.

$ hexdump -C 2.bin
00000000  0c 46 69 72 73 74 20 53  74 72 69 6e 67 04 03 02  |.First String...|
00000010  01 00 00 00 00 00 00 f8  3f 0d 53 65 63 6f 6e 64  |........?.Second|
00000020  20 53 74 72 69 6e 67                              | String|

Although we used a web service writing through to a file adapter we could have equally well used the socket adapter to send the data to a TCP endpoint.  Similarly the source of the data could be anything.  The same principle can be applied to decode binary data, just reverse the steps and use Java method parseBase64Binary instead of printBase64Binary.

List Manipulation in Rules

Thu, 2013-12-26 19:10
Generating Lists from Rules

Recently I was working with a customer that wanted to use rules to do validation.  The idea was to pass in a document to the rules engine and get back a list of violations, or an empty list if there were no violations.  Turns out that there were a coupe more steps required than I expected so thought I would share my solution in case anyone else is wondering how to return lists from the rules engine.

The Scenario

For the purposes of this blog I modeled a very simple shipping company document that has two main parts.   The Package element contains information about the actual item to be shipped, its weight, type of package and destination details.  The Billing element details the charges applied.

For the purpose of this blog I want to validate the following:

  • A residential surcharge is applied to residential addresses.
  • A residential surcharge is not applied to non-residential addresses.
  • The package is of the correct weight for the type of package.

The Shipment element is sent to the rules engine and the rules engine replies with a ViolationList element that has all the rule violations that were found.

Creating the Return List

We need to create a new ViolationList within rules so that we can return it from within the decision function.  To do this we create a new global variable – I called it ViolationList – and initialize it.  Note that I also had some globals that I used to allow changing the weight limits for different package types.

When the rules session is created it will initialize the global variables and assert the input document – the Shipment element.  However within rules our ViolationList variable has an uninitialized internal List that is used to hold the actual List of Violation elements.  We need to initialize this to an empty RL.list in the Decision Functions “Initial Actions” section.

We can then assert the global variable as a fact to make it available to be the return value of the decision function.  After this we can now create the rules.

Adding a Violation to the List

If a rule fires because of a violation then we need add a Violation element to the list.  The easiest way to do this without having the rule check the ViolationList directly is to create a function to add the Violation to the global variable VioaltionList.

The function creates a new Violation and initializes it with the appropriate values before appending it to the list within the ViolationList.

When a rule fires then it just necessary to call the function to add the violation to the list.

In the example above if the address is a residential address and the surcharge has not been applied then the function is called with an appropriate error code and message.

How it Works

Each time a rule fires we can add the violation to the list by calling the function.  If multiple rules fire then we will get multiple violations in the list.  We can access the list from a function because it is a global variable.  Because we asserted the global variable as a fact in the decision function initialization function it is picked up by the decision function as a return value.  When all possible rules have fired then the decision function will return all asserted ViolationList elements, which in this case will always be 1 because we only assert it in the initialization function.

What Doesn’t Work

A return from a decision function is always a list of the element you specify, so you may be tempted to just assert individual Violation elements and get those back as a list.  That will work if there is at least one element in the list, but the decision function must always return at least one element.  So if there are no violations then you will get an error thrown.

Alternative

Instead of having a completely separate return element you could have the ViolationList as part of the input element and then return the input element from the decision function.  This would work but now you would be copying most of the input variables back into the output variable.  I prefer to have a cleaner more function like interface that makes it easier to handle the response.

Download

Hope this helps someone.  A sample composite project is available for download here.  The composite includes some unit tests.  You can run these from the EM console and then look at the inputs and outputs to see how things work.

Cleaning Up After Yourself

Tue, 2013-12-24 11:50
Maintaining a Clean SOA Suite Test Environment

Fun blog entry with Fantasia animated gifs got me thinking like Mickey about how nice it would be to automate clean up tasks.

I don’t have a sorcerers castle to clean up but I often have a test environment which I use to run tests, then after fixing problems that I uncovered in the tests I want to run them again.  The problem is that all the data from my previous test environment is still there.

Now in the past I used VirtualBox snapshots to rollback to a clean state, but this has a problem that it not only loses the environment changes I want to get rid of such as data inserted into tables, it also gets rid of changes I want to keep such as WebLogic configuration changes and new shell scripts.  So like Mickey I went in search of some magic to help me.

Cleaning Up SOA Environment

My first task was to clean up the SOA environment by deleting all instance data from the tables.  Now I could use the purge scripts to do this, but that would still leave me with running instances, for example 800 Human Workflow Tasks that I don’t want to deal with.  So I used the new truncate script to take care of this.  Basically this removes all instance data from your SOA Infrastructure, whether or not the data is live.  This can be run without taking down the SOA Infrastructure (although if you do get strange behavior you may want to restart SOA).  Some statistics, such are service and reference statistics, are kept since server startup, so you may want to restart your server to clear that data.  A sample script to run the truncate SQL is shown below.

#!/bin/sh
# Truncate the SOA schemas, does not truncate BAM.
# Use only in development and test, not production.

# Properties to be set before running script
# SOAInfra Database SID
DB_SID=orcl
# SOA DB Prefix
SOA_PREFIX=DEV
# SOAInfra DB password
SOAINFRA_PASSWORD=welcome1
# SOA Home Directory
SOA_HOME=/u01/app/fmw/Oracle_SOA1

# Set DB Environment
. oraenv << EOF
${DB_SID}
EOF

# Run Truncate script from directory it lives in
cd ${SOA_HOME}/rcu/integration/soainfra/sql/truncate

# Run the truncate script
sqlplus ${SOA_PREFIX}_soainfra/${SOAINFRA_PASSWORD} @truncate_soa_oracle.sql << EOF
exit
EOF

After running this script all your SOA composite instances and associated workflow instances will be gone.

Cleaning Up BAM

The above example shows how easy it is to get rid of all the runtime data in your SOA repository, however if you are using BAM you still have all the contents of your BAM objects from previous runs.  To get rid of that data we need to use BAM ICommand’s clear command as shown in the sample script below:

#!/bin/sh
# Set software locations
FMW_HOME=/home/oracle/fmw
export JAVA_HOME=${FMW_HOME}/jdk1.7.0_17
BAM_CMD=${FMW_HOME}/Oracle_SOA1/bam/bin/icommand
# Set objects to purge
BAM_OBJECTS=/path/RevenueEvent /path/RevenueViolation

# Clean up BAM
for name in ${BAM_OBJECTS}
do
  ${BAM_CMD} -cmd clear -name ${name} -type dataobject
done

After running this script all the rows of the listed objects will be gone.

Ready for Inspection

Unlike the hapless Mickey, our clean up scripts work reliably and do what we want without unexpected consequences, like flooding the castle.

Supporting the Team

Fri, 2013-12-20 15:44
SOA Support Team Blog

Some of my former colleagues in support have created a blog to help answer common problems for customers.  One way they are doing this is by creating better landing zones within My Oracle Support (MOS).  I just used the blog to locate the landing zone for database related issues in SOA Suite.  I needed to get the purge scripts working on 11.1.1.7 and I couldn’t find the patches needed to do that.  A quick look on the blog and I found a suitable entry that directed me to the Oracle Fusion Middleware (FMW) SOA 11g Infrastructure Database: Installation, Maintenance and Administration Guide (Doc ID 1384379.1) in MOS.  Lots of other useful stuff on the blog so stop by and check it out, great job Shawn, Antonella, Maria & JB.

$5 eBook Bonanza

Thu, 2013-12-19 10:20
Packt eBooks $5 Offer

Packt Publishing just told me about their Christmas offer, get eBooks for $5.

From December 19th, customers will be able to get any eBook or Video from Packt for just $5. This offer covers a myriad of titles in the 1700+ range where customers will be able to grab as many as they like until January 3rd 2014 – more information is available at http://bit.ly/1jdCr2W

If you haven’t bought the SOA Developers Cookbook then now is a great time to do so!