Concurrency and Project Design : Using Locks to Ensure Data Integrity Within and Across Agents

Using Locks to Ensure Data Integrity Within and Across Agents
Multiple agents can read from and write to the same cache cluster and at times they can operate on the same set of objects. Multiple threads in one agent can also behave in a similar manner, to enable concurrent RTCs.
Locking is used to ensure the data you read is up-to-date, and to ensure that no other RTC is updating the same data concurrently.
Understanding Locking in TIBCO BusinessEvents
Locking protects the thread of execution when multiple threads in an agent can cause conflicts by trying to write to the same concept at the same time during multi-threaded Rete operations (where multiple RTCs are running concurrently). The same type of issue can occur across inference agents operating concurrently. Locking is also necessary to ensure that stale data – data that has been modified in another RTC but not yet written to cache – is not read.
Locking is one of the necessary costs of tuning inference agents for higher performance through multi-threaded Rete or concurrency across agents.
Lock operations do not operate a lock on the object you want to protect itself. They set a common flag that represents a lock — it is up to the developer to ensure that all accesses and updates to a concept subject to locking are enforced, and that only necessary concepts (including concepts that are written to as well as those used in conditions) are locked
The goal of locking is to ensure consistency across concurrent RTCs. For example, if one RTC has a rule condition that includes a concept, and another RTC updates that concept, then the results could be undefined (does the condition use the new or old values of the object?). Or if two RTCs update the same object at the same time, then different properties of the object could be set by different threads leaving an overall object with incorrect property values.
The typical lock operation is: in the event preprocessor set the lock, using any unique string as a key. For example, you can use the object extId as the lock string.
If a preprocessor cannot acquire the lock (because another event’s preprocessor has acquired it) then it waits until either the lock is released OR some timeout occurs.
Locking code needs to be prepared carefully. If two events try lock(A) then lock(B) and lock(B) then lock(A) respectively, then a situation can arise where both are waiting on each other’s thread. Locking should be used sparingly.
With Cache Plus Memory OM, the need for locks is greater than with Cache Only, because each agent’s Rete network must be synchronized for changes made in all other agents’ Rete networks.
Because of issues around effective locking with Cache Plus Memory in multi-agent scenarios, Cache Only mode is generally recommended. Cache Plus Memory is useful for objects that change infrequently.
When to Use Locking
Depending on your application, locking may not be required in all cases. However it is almost always needed. For most applications, use locking in the following cases:
With Cache Only mode, for writes. Locking is done using an event preprocessor.
With Cache Plus Memory mode, for writes, and for subscription RTCs. These RTCs run internally to keep the Rete networks on each agent synchronized with the cache). Locking is done using a subscription preprocessor. It uses the same locking key (string) as in the event preprocessor, if one was used.
With any mode, for reads. If you want to read the latest version of a concept in one agent at the same time that another agent might create or update the same concept, mediate the reads through the same lock that was used when creating or updating the concept. This is done using an event preprocessor and, for Cache Plus Memory, a subscription preprocessor.
With state modeler, for timeouts. State modeler timeouts don’t go through an event preprocessor, so locking is done a different way. This is explained in TIBCO BusinessEvents Data Modeling Developer’s Guide.
Lock Processing Example Flow
The following example demonstrates common locking requirements. It uses these features:
Two agents receive messages that require changes to one Customer instance:
Event Preprocessor
Note that event preprocessors are multi threaded (see Event Preprocessors for more details).
1.
A message comes into a channel on Agent A: a change to a customer address. TIBCO BusinessEvents dequeues the message from the queue, deserializes the message to an event, and calls the event preprocessor function. The preprocessor acquires a lock using the customer’s extID as the key:
Cluster.DataGrid.Lock(Customer@extId, -1, false);
This function causes the thread to stop until it gets the lock. In this example, the thread gets the lock.
2.
Only one thread handles the RTC. (Concurrent RTC is not used in this example.) Other event preprocessor threads go into a queue. During the RTC, a rule executes and modifies the customer address. After RTC completes, the post RTC phase begins: the address change is written to L1 cache, the cluster cache, and the backing store in parallel. Messages arising from the RTC are sent.
3.
Subscription Preprocessor (Cache Plus Memory Only)
Subscription preprocessor activities run on a different thread from event preprocessor activities. One subscription task handles all the changes to the objects from one regular RTC and executes as many preprocessors as there are modified objects. (The subscription preprocessor is assigned to an entity in the CDD file Domain Object Override Settings area.)
4.
Agent A sends a change notification to all other agents that have subscribed to cluster notifications for this object. Agent B receives the notification, and calls the subscription preprocessor function. It contains the same function shown in step 1 above. It uses the same locking string. Agent B acquires the lock (that is the function returns true). The agent updates the Rete network with the changes (using a "subscription RTC"). It will release the lock when the subscription RTC is complete.
5.
While the subscription update thread holds the lock, Agent B receives a message with a change to the same customer’s address and attempts to acquire a lock, but it is blocked (that is, the function returns false).
When the subscription preprocessor releases the lock, then Agent B’s event preprocessor can acquire it (depending on timeout behavior) and assert the event to the Rete network.
Depending on timing, either an event preprocessor or a subscription preprocessor could be holding the lock.
Locking Functions
The TIBCO BusinessEvents lock function has the following format:
Cluster.DataGrid.Lock(String key, long timeout, boolean LocalOnly)
If you want to acquire the lock only within the agent, set LocalOnly to true. Set the LocalOnly parameter to false to acquire a cluster wide lock. For example if you are only concerned about the preprocessor threads in one agent, you can use a local lock.
The worker thread that calls the lock() function is the only thread that gets blocked.
Unlock Function
All the locks acquired during event processing are released automatically after all post RTC actions, cache operations (and database writes in the case of cache-aside mode) are done.
The format of the unlock function is as follows:
Cluster.DataGrid.UnLock(String key, boolean LocalOnly)
You can use the corresponding UnLock() function for cases where the automatic unlocking does not meet your needs.
The Cluster.DataGrid.Lock() and Cluster.DataGrid.UnLock() functions are available in event and subscription preprocessors and in rules. However, in general it is not a good idea to use lock() in rules as the order of processing of rules is not guaranteed.
You can call Cluster.DataGrid.UnLock() in a rule only when concurrent RTC is used.
Tips on Using Locks
The example LockExample (in BE_HOME/examples/standard) demonstrates these points, showing use of locks to prevent race conditions.
Choose an appropriate key for lock(). Note that lock() does not lock any entity or object as such. The purpose of lock() is to ensure sequential processing of related set of objects, but yet ensure concurrent processing of unrelated objects. For example, you want to process messages related to one customer sequentially across the cluster, but want to process messages for different customers in parallel. In this case you could use the customer ID as the key used for lock(). This ensures that all messages for a given customer ID are processed sequentially.
Do not use unchecked and infinite waits (-1) on the lock. The recommended approach is to use the timeout argument, and then exit with an error.
Always check the return value of lock() and if false, either retry or handle it as an error. Don't let application logic proceed if it returns false. Doing so may result in lost updates or stale reads or other such data inconsistencies.
Try to minimize the locks acquired in one thread. If you have to acquire multiple locks in one thread, ensure that the locks are acquired in the same order of keys, that is, sort the keys. See Avoiding Deadlock when Multiple Keys Protect One Object.
Use lock() even for read-only operations. If you don’t you may get "Inconsistent Database" messages, for example, if there are concurrent deletes elsewhere in other threads or agents.
In general, avoid using lock() in a rule. Since rule order of execution is not guaranteed such usage may lead to deadlocks.
Avoiding Deadlock when Multiple Keys Protect One Object
In the simplest cases you can use some unique feature of the object you want to protect as the locking key, for example, a customer ID. However different events may point to the same objects using different information. For example, from one channel, the object may be identified using customer ID, and from another, using account ID.
In such cases multiple keys are used to identify the same object. When you acquire a lock to protect such an object, you must first get the other key from your system, sort the keys and take a lock on both keys. Sorting can be implemented using a custom function.
If the ordering of keys is not guaranteed, it may lead to a deadlock in a multi-agent or concurrent RTC (multi-threaded) environment. For this reason, avoid use of lock() in a loop, where the intention is to process multiple messages. There are other ways to achieve this, for example, using the assertEvent_Async() function.
Diagnosing and Resolving Lock Failures
Instead of throwing an exception after failing to acquire a lock after a few attempts, re-route the event to a special destination that only handles errors (an "error queue"), so you have control over which queue the message goes to.
Write a preprocessor on the “error queue” that does do one of the following for each message:
For example:

 
System.debugOut("Attempting to lock..");
boolean result = false;
for(int i = 1; i <= 3; i = i + 1){
   result = Cluster.DataGrid.Lock("$lock0", 2000, false);
 
   if(result == false){
      System.debugOut("Lock acquisition '$lock0' failed. Attempts: " + i);
   }
   else{
      System.debugOut("Lock acquisition '$lock0' succeeded. Attempts: " + i);
      break;
   }
}
 
if(result == false){
   Event.consumeEvent(newevent);
   Event.routeTo(newevent, "/Channel/LockErrorDestination", null);
   }
}