The release of SOA Suite 12c sees the addition of a Coherence Adapter to the list of Technology Adapters that are licensed with the SOA Suite. In this entry I provide an introduction to configuring the adapter and using the different operations it supports.
The Coherence Adapter provides access to Oracles Coherence Data Grid. The adapter provides access to the cache capabilities of the grid, it does not currently support the many other features of the grid such as entry processors – more on this at the end of the blog.
Previously if you wanted to use Coherence from within SOA Suite you either used the built in caching capability of OSB or resorted to writing Java code wrapped as a Spring component. The new adapter significantly simplifies simple cache access operations.Configuration
When creating a SOA domain the Coherence adapter is shipped with a very basic configuration that you will probably want to enhance to support real requirements. In this section I look at the configuration required to use Coherence adapter in the real world.Activate Adapter
The Coherence Adapter is not targeted at the SOA server by default, so this targeting needs to be performed from within the WebLogic console before the adapter can be used.
Create a cache configuration file
The Coherence Adapter provides a default connection factory to connect to an out-of-box Coherence cache and also a cache called adapter-local. This is helpful as an example but it is good practice to only have a single type of object within a Coherence cache, so we will need more than one. Without having multiple caches then it is hard to clean out all the objects of a particular type. Having multiple caches also allows us to specify different properties for each cache. The following is a sample cache configuration file used in the example.
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
This defines a single cache called TestCache. This is a distributed cache, meaning that the entries in the cache will distributed across the grid. This enables you to scale the storage capacity of the grid by adding more servers. Additional caches can be added to this configuration file by adding additional <cache-mapping> elements.
The cache configuration file is reference by the adapter connection factory and so needs to be on a file system accessed by all servers running the Coherence Adapter. It is not referenced from the composite.Create a Coherence Adapter Connection Factory
We find the correct cache configuration by using a Coherence Adapter connection factory. The adapter ships with a few sample connection factories but we will create new one. To create a new connection factory we do the following:
- On the Outbound Connection Pools tab of the Coherence Adapter deployment we select New to create the adapter.
- Choose the javax.resource.cci.ConnectionFactory group.
- Provide a JNDI name, although you can use any name something along the lines of eis/Coherence/Test is a good practice (EIS tells us this an adapter JNDI, Coherence tells us it is the Coherence Adapter, and then we can identify which adapter configuration we are using).
- If requested to create a Plan.xml then make sure that you save it in a location available to all servers.
- From the outbound connection pool tab select your new connection factory so that you can configure it from the properties tab.
- Set the CacheConfigLocation to point to the cache configuration file created in the previous section.
- Set the ClassLoaderMode to CUSTOM.
- Set the ServiceName to the name of the service used by your cache in the cache configuration file created in the previous section.
- Set the WLSExtendProxy to false unless your cache configuration file is using an extend proxy.
- If you plan on using POJOs (Plain Old Java Objects) with the adapter rather than XML then you need to point the PojoJarFile at the location of a jar file containing your POJOs.
- Make sure to press enter in each field after entering your data. Remember to save your changes when done.
You may will need to stop and restart the adapter to get it to recognize the new connection factory.Operations
To demonstrate the different operations I created a WSDL with the following operations:
- put – put an object into the cache with a given key value.
- get – retrieve an object from the cache by key value.
- remove – delete an object from the cache by key value.
- list – retrieve all the objects in the cache.
- listKeys – retrieve all the keys of the objects in the cache.
- removeAll – remove all the objects from the cache.
I created a composite based on this WSDL that calls a different adapter reference for each operation. Details on configuring the adapter within a composite are provided in the Configuring the Coherence Adapter section of the documentation.
I used a Mediator to map the input WSDL operations to the individual adapter references.
The input schema is shown below.
This type of pattern is likely to be used in all XML types stored in a Coherence cache. The XMLCacheKey element represents the cache key, in this schema it is a string, but could be another primitive type. The other fields in the cached object are represented by a single XMLCacheContent field, but in a real example you are likely to have multiple fields at this level. Wrapper elements are provided for lists of elements (XMLCacheEntryList) and lists of cache keys (XMLCacheEntryKeyList). XMLEmpty is used for operation that don’t require an input.Put Operation
The put operation takes an XMLCacheEntry as input and passes this straight through to the adapter. The XMLCacheKey element in the entry is also assigned to the jca.coherence.key property. This sets the key for the cached entry. The adapter also supports automatically generating a key, which is useful if you don’t have a convenient field in the cached entity. The cache key is always returned as the output of this operation.
The get operation takes an XMLCacheKey as input and assigns this to the jca.coherence.key property. This sets the key for the entry to be retrieved.
The remove operation takes an XMLCacheKey as input and assigns this to the jca.coherence.key property. This sets the key for the entry to be deleted.
This is similar to the remove operation but instead of using a key as input to the remove operation it uses a filter. The filter could be overridden by using the jca.coherence.filter property but for this operation it was permanently set in the adapter wizard to be the following query:
key() != ""
This selects all objects whose key is not equal to the empty string. All objects should have a key so this query should select all objects for deletion.
Note that there appears to be a bug in the return value. The return value is entry rather than having the expected RemoveResponse element with a Count child element. Note the documentation states that
When using a filter for a Remove operation, the Coherence Adapter does not report the count of entries affected by the remove operation, regardless of whether the remove operation is successful.
When using a key to remove a specific entry, the Coherence Adapter does report the count, which is always 1 if a Coherence Remove operation is successful.
Although this could be interpreted as meaning an empty part is returned, an empty part is a violation of the WSDL contract.
The list operation takes no input and returns the result list returned by the adapter. The adapter also supports querying using a filter. This filter is essentially the where clause of a Coherence Query Language statement. When using XML types as cached entities then only the key() field can be tested, for example using a clause such as:
key() LIKE “Key%1”
This filter would match all entries whose key starts with “Key” and ends with “1”.
The listKeys operation is essentially the same as the list operation except that only the keys are returned rather than the whole object.
To test the composite I used the new 12c Test Suite wizard to create a number of test suites. The test suites should be executed in the following order:
- CleanupTestSuite has a single test that removes all the entries from the cache used by this composite.
- InitTestSuite has 3 tests that insert a single record into the cache. The returned key is validated against the expected value.
- MainTestSuite has 5 tests that list the elements and keys in the cache and retrieve individual inserted elements. This tests that the items inserted in the previous test are actually in the cache. It also tests the get, list and listAll operations and makes sure they return the expected results.
- RemoveTestSuite has a single test that removes an element from the cache and tests that the count of removed elements is 1.
- ValidateRemoveTestSuite is similar to MainTestSuite but verifies that the element removed by the previous test suite has actually been removed.
One example of using the Coherence Adapter is to create a shared memory region that allows SOA composites to share information. An example of this is provided by Lucas Jellema in his blog entry First Steps with the Coherence Adapter to create cross instance state memory.
However there is a problem in creating global variables that can be updated by multiple instances at the same time. In this case the get and put operations provided by the Coherence adapter support a last write wins model. This can be avoided in Coherence by using an Entry Processor to update the entry in the cache, but currently entry processors are not supported by the Coherence Adapter. In this case it is still necessary to use Java to invoke the entry processor.Sample Code
The sample code I refer to above is available for download and consists of two JDeveloper projects, one with the cache config file and the other with the Coherence composite.
- CoherenceConfig has the cache config file that must be referenced by the connection factory properties.
- CoherenceSOA has a composite that supports the WSDL introduced at the start of this blog along with the test cases mentioned at the end of the blog.
The Coherence Adapter is a really exciting new addition to the SOA developers toolkit, hopefully this article will help you make use of it.
The JMS Adapter is unaware of who receives the messages. Each adapter instance just takes the message from the queue and delivers it to its own configured interface, one interface per adapter instance. The SOA infrastructure is then responsible for routing that message, usually via a database table and an in memory notification message, to a component within a composite. Each message will create a new composite but the BPEL engine and Mediator engine will attempt to match callback messages to the appropriate Mediator or BPEL instance.
Note that message type, including XML document type, has nothing to do with the preceding statements.
The net result is that if you have a sequence of two receives from the same queue using different adapters then the messages will be split equally between the two adapters, meaning that half the time the wrong adapter will receive the message. This blog entry looks at how to resolve this issue.
Note that the same problem occurs whenever you have more than 1 adapter listening to the same queue, whether they are in the same composite or different composites. The solution in this blog entry is also relevant to this use case.SolutionsIn order to correctly deliver the messages to the correct interface we need to identify the interface they should be delivered to. This can be done by using JMS properties. For example the JMSType property can be used to identify the type of the message. A message selector can be added to the JMS inbound adapter that will cause the adapter to filter out messages intended for other interfaces. For example if we need to call three services that are implemented in a single application:
- Service 1 receives messages on the single outbound queue from SOA, it send responses back on the single inbound queue.
- Similarly Service 2 and Service 3 also receive messages on the single outbound queue from SOA, they send responses back on the single inbound queue.
- aThe inbound JMS adapter is configured with a JMS message selector. The message selector might be "JMSType='Service1'" for responses from Service 1. Similarly the selector would be "JMSType='Service2'" for the adapter waiting on a response from Service 2. The message selector ensures that each adapter instance will retrieve the first message from the queue that matches its selector.
- The sending service needs to set the JMS property (JMSType in our example) that is used in the message selector.
- We can do manual correlation with a correlation set, identifying parts of the outbound message that uniquely identify our instance and matching them with parts of the inbound message to make the correlation.
- We can use a Request-Reply JMS adapter which by default expects the response to contain a JMSCorrelationID equal to the outgoing JMSMessageID. Although no configuration is required for this on the SOA client side, the service needs to copy the incoming JMSMessageID to the outgoing JMSCorrelationID.
When using a synchronous Request-Reply JMS adapter we can omit to specify the message selector because the Request-Reply JMS adapter will immediately do a listen with a message selector for the correlation ID rather than processing the incoming message asynchronously.
The synchronous request-reply will block the BPEL process thread and hold open the BPEL transaction until a response is received, so this should only be used when you expect the request to be completed in a few seconds.
The JCA Connection Factory used must point to a non-XA JMS Connection Factory and must have the isTransacted property set to “false”. See the documentation for more details.Sample
I developed a JDeveloper SOA project that demonstrates using a single queue for multiple incoming adapters. The overall process flow is shown in the picture below. The BPEL process on the left receives messages from the jms/TestQueue2 and sends messages to the jms/Test Queue1. A Mediator is used to simulate multiple services and also provide a web interface to initiate the process. The correct adapter is identified by using JMS message properties and a selector.
The flow above shows that the process is initiated from EM using a web service binding on mediator. The mediator, acting as a client, posts the request to the inbound queue with a JMSType property set to "Initiate".
Client receives web service request and posts the request to the inbound queue with JMSType='Initiate'
The JMS adapter with a message selector "JMSType='Initiate'" receives the message and causes a composite to be created. The composite in turn causes the BPEL process to start executing.
The BPEL process then sends a request to Service 1 on the outbound queue.
- Initiate message can be used to initate a correlation set if necessary
- Selector required to distinguish initiate messages from other messages on the queue
The BPEL process then sends a request to Service 2 on the outbound queue.
- Separate request & reply adapters require a correlation set to ensure that reply goes to correct BPEL process instance
- Selector required to distinguish service 1 response messages from other messages on the queue
- Asynchronous request-reply adapter does not require a correlation set, JMS adapter auto-correlates using CorrelationID to ensure that reply goes to correct BPEL process instance
- Selector still required to distinguish service 2 response messages from other messages on the queue
Service 3 receives the request and sends a response on the inbound queue with JMSType='Service2' and JMSCorrelationID= incoming JMS Message ID. Synchronous Request-Reply Adapter The synchronous JMS adapter receives the response without a message selector and correlates it to the BPEL process using native JMS correlation and sends the overall response to the outbound queue.
- Synchronous request-reply adapter does not require a correlation set, JMS adapter auto-correlates using CorrelationID to ensure that reply goes to correct BPEL process instance
- Selector also not required to distinguish service 3 response messages from other messages on the queue because the synchronous adapter is doing a selection on the expected CorrelationID
When using a single JMS queue for multiple purposes bear in mind the following:
- If multiple receives use the same queue then you need to have a message selector. The corollary to this is that the message sender must add a JMS property to the message that can be used in the message selector.
- When using a request-reply JMS adapter then there is no need for a correlation set, correlation is done in the adapter by matching the outbound JMS message ID to the inbound JMS correlation ID. The corollary to this is that the message sender must copy the JMS request message ID to the JMS response correlation ID.
- When using a synchronous request-reply JMS adapter then there is no need for the message selector because the message selection is done based on the JMS correlation ID.
- Synchronous request-reply adapter requires a non-XA connection factory to be used so that the request part of the interaction can be committed separately to the receive part of the interaction.
- Synchronous request-reply JMS adapter should only be used when the reply is expected to take just a few seconds. If the reply is expected to take longer then the asynchronous request-reply JMS adapter should be used.
The sample is available to download here and makes use of the following JMS resources:JNDI Resource; Notes jms/TestQueue Queue Outbound queue from the BPEL process jms/TestQueue2 Queue Inbound queue to the BPEL process eis/wls/TestQueue JMS Adapter Connector Factory This can point to an XA or non-XA JMS Connection Factory such as weblogic.jms.XAConnectionFactory eis/wls/TestQueue None-XA JMS Adapter Connector Factory This must point to a non-XA JMS Connection Factory such as weblogic.jms.ConnectionFactory and must have isTransacted set to “false”
To run the sample then just use the test facility in the EM console or the soa-infra application.
Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid. Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster. In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.Coherence Distributed Computing Options
The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache. They can be summarized as:
- Entry Processors
- An InvocableMap interface, inherited by the NamedCache interface, provides support for executing an agent (EntryProcessor or EntryAggregator) on individual entries within the cache.
- The entries may or may not exist, either way the agent is executed once for each key provided, or if no key is provided then it is executed once for each object in the cache.
- In Enterprise and Grid editions of Coherence the entry processors are executed on the primary cache nodes holding the cached entries.
- Agents can return results.
- One agent executes multiple times per cache node, once for each key targeted on the node.
- Invocation Service
- An InvocationService provides support for executing an agent on one or more nodes within the grid.
- Execution may be targeted at specific nodes or at all nodes running the Invocation Service.
- Agents can return results.
- One agent executes once per node.
- Work Managers
- A WorkManager class provides a grid aware implementation of the commonJ WorkManager which can be used to run tasks across multiple threads on multiple nodes within the grid.
- WorkManagers run on multiple nodes.
- Each WorkManager may have multiple threads.
- Tasks implement the Work interface and are assigned to specific WorkManager threads to execute.
- Each task is executed once.
The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:
- Per Cache Entry Execution (Entry Processor)
- Execute the agent on the entry corresponding to a cache key.
- Entries processed on a single thread per node.
- Parallelism across nodes.
- Per Node Execution (Invocation Service)
- Execute the same agent once per node.
- Agent processed on a single thread per node.
- Parallelism across nodes.
- Per Task Execution (Work Manager)
- Each task executed once.
- Parallelism across nodes and across threads within a node.
The entry processor is good for operating on individual cache entries. It is not so good for working on groups of cache entries.
The invocation service is good for performing checks on a node, but is limited in its parallelism.
The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel. It has a high degree of parallelism.
As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.Differences between using Entry Processors and Work Managers in Coherence Aspect Entry Processors Work Managers Degree of parallelization Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time. Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances. Transactionality Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node. Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager. How is the Cache accessed or mutated? Operations against the cache contents are executed by (and thus within the localized context of) a cache. Accesses and changes to the cache are done directly through the cache API. Where is the processing performed? In the same JVM where the entries-to-be-processed reside. In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside. Network Traffic Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node. Is a function of the
- Number of Work Objects, of which multiple may be sent to each server.
- Size of the data set transferred from the Backing Map to the Work Manager Server.
- Get a reference to the named cache
- Do the Work – Get a reference to the Cache Item; change the cache item; put the cache item back into the named cache.
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys. commonj.work.WorkItem - There are three possible outcomes
- The Work is not yet complete. In this case, a null is returned by WorkItem.getResult.
- The Work started but completed with an exception. This may have happened due to a Work Manager Instance terminating abruptly. This is indicated by an exception thrown by WorkItem.getResult.
- The Work Manager instance indicated that the Work is complete and the Work ran to completion. In this case, WorkItem.getResult returns a non-null and no exception is thrown by WorkItem.getResult.
Entry processors have excellent error handling within Coherence. Work Managers less so. In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.
A JDeveloper project with the RetryWorkManager is available for download here. It includes sample code to run a simple task across multiple work manager threads.
To create a new RetryWorkManager that will retry failed work twice then you would use this:
WorkManager = new RetryWorkManager("WorkManagerName", 2); // Change for number of retries, if no retry count is provided then the default is 0.You can control the number of retries at the individual work level as shown below:
WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retriesCurrently the RetryWorkManager defaults to having 0 threads. To change use this constructor: WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence. How the RetryWorkManager Works
The RetryWorkManager delegates most operations to a Coherence WorkManager instance. It creates a WorkManagerListener to intercept status updates. On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error. If an error occurred and there are retries left then the work is resubmitted. The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem. This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried. If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener. Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.Sample Code for EntryProcessor and RetryWorkManager
The downloadable project contains sample code for running the work manager and an entry processor.
The demo implements a 3-tier architecture
- Coherence Cache Servers
- Can be started by running RunCacheServer.cmd
- Runs a distributed cache used by the Task to be executed in the grid
- Coherence Work Manager Servers
- Can be started by running RunWorkManagerServer.cmd
- Takes no parameters
- Runs two threads for executing tasks
- Coherence Work Manager Clients
- Can be started by running RunWorkManagerClient.cmd
- Takes three parameters currently
- Work Manager name - should be "AntonyWork" - default is "AntonyWork"
- Number of tasks to schedule - default is 10
- Time to wait for tasks to complete in seconds - default is 60
The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing. The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.
The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.
If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor. The entry processor is fault tolerant and updates to the cached entity will be performed once only.
If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution. The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.