Node state change notifiers can be installed by applications
to allow them to be notified when remote nodes change their state. Node
state change notifiers can be used to manage local node resources, or to
dynamically maintain partition definitions as nodes are added and removed
from the cluster. Node state change notifiers are defined using the
com.kabira.platform.highavailability.NodeNotifier
abstract class.
An application must create and register a node notifier on each node
where it is interested in receiving notifications of remote node state
changes. The
method will be
called every time a remote node comes active. The
active
active
method is also called for all active remote
nodes that have already been discovered when the notifier is installed.
The unavailable
method will be called every time a
remote node becomes unavailable.
Example 7.14, “Dynamically maintaining cluster-wide partitions” shows a snippet that installs node notifiers on every node on which it is run. A partition is defined using the name of the local node, and the replica list for the partition is dynamically maintained as nodes come active. Partitioned objects are created in these partitions and are available to all other nodes in the cluster. This allows the other nodes to easily find and execute a remote method on all active nodes in the cluster.
![]() | |
This snippet does not exit until explicitly shutdown by terminating the deployment tool or using administrative tools to stop the JVM. |
Example 7.14. Dynamically maintaining cluster-wide partitions
// $Revision: 1.1.2.6.4.1 $ package com.kabira.snippets.highavailability; import com.kabira.platform.KeyFieldValueList; import com.kabira.platform.KeyManager; import com.kabira.platform.KeyQuery; import com.kabira.platform.LockMode; import com.kabira.platform.ManagedObject; import com.kabira.platform.Transaction; import com.kabira.platform.annotation.Key; import com.kabira.platform.annotation.Managed; import com.kabira.platform.highavailability.NodeNotifier; import com.kabira.platform.highavailability.Partition; import com.kabira.platform.highavailability.PartitionManager; import static com.kabira.platform.highavailability.PartitionManager.EnableAction.JOIN_CLUSTER; import com.kabira.platform.highavailability.PartitionMapper; import com.kabira.platform.highavailability.ReplicaNode; import com.kabira.platform.property.Status; /** * Snippet showing how to handle node state changes. * * Each node defines a partition using its node name and adds and removes remote * nodes from the partition as they become active/inactive. * * <p> * <h2> Target Nodes</h2> * <ul> * <li> <b>domainname</b> = Development * </ul> */ public class NodeStateChange { @Managed @Key(name = "ByName", fields = { "nodeName" }, unique = true, ordered = true) private static class NodeObject { final String nodeName; NodeObject(String nodeName) { this.nodeName = nodeName; } void remoteExecute(String caller) { System.out.println("remoteExecute executed on node " + m_nodeName + ", caller node was " + caller + "."); } @Override public String toString() { return "NodeObject: " + nodeName; } }; private static class NodeObjectMapper extends PartitionMapper { @Override public String getPartition(Object obj) { NodeObject no = (NodeObject) obj; return no.nodeName; } } /** * Node state change notifier */ private static class Notifier extends NodeNotifier { @Override protected void active(String remoteNode) { // // Check to see if we already have seen this node. // Partition partition = PartitionManager.getPartition(m_nodeName); String[] currentNodeList = partition.getNodeList(); for (String currentNode : currentNodeList) { if (currentNode.equals(remoteNode) == true) { return; } } // // Add remote node to partition as a replica // String[] currentReplicas = partition.getReplicaNodes(); ReplicaNode[] replicas = new ReplicaNode[currentReplicas.length + 1]; for (int i = 0; i < currentReplicas.length; i++) { replicas[i] = partition.getReplicaNode(i); } replicas[currentReplicas.length] = new ReplicaNode(remoteNode, ReplicaNode.ReplicationType.SYNCHRONOUS); partition.migrate(null, partition.getActiveNode(), replicas); // // Create object that will be pushed to all remote nodes. // createNodeObject(m_nodeName); System.out.println(m_nodeName + ": " + remoteNode + " active"); } @Override protected void unavailable(String remoteNode) { System.out.println(m_nodeName + ": " + remoteNode + " unavailable"); // // Delete the remote instance, since we no longer can use it. // deleteNodeObject(remoteNode); // // The remote node will automatically get removed from the // partition definition as part of failover processing. So we // don't need to do anything here for our partition. // } private void createNodeObject(String node) { KeyManager<NodeObject> km = new KeyManager<>(); KeyQuery<NodeObject> kq = km.createKeyQuery(NodeObject.class, "ByName"); KeyFieldValueList kfvl = new KeyFieldValueList(); kfvl.add("nodeName", node); kq.defineQuery(kfvl); kq.getOrCreateSingleResult(LockMode.WRITELOCK, null); } private void deleteNodeObject(String node) { KeyManager<NodeObject> km = new KeyManager<>(); KeyQuery<NodeObject> kq = km.createKeyQuery(NodeObject.class, "ByName"); KeyFieldValueList kfvl = new KeyFieldValueList(); kfvl.add("nodeName", node); kq.defineQuery(kfvl); NodeObject no = kq.getSingleResult(LockMode.WRITELOCK); if (!ManagedObject.isEmpty(no)) { ManagedObject.delete(no); } } } /** * Main entry point * * @param args Not used * @throws java.lang.InterruptedException */ public static void main(String[] args) throws InterruptedException { final Notifier notifier = initialize(); // // Wait here for operator to shutdown example // waitForOperator(); // // Clean up node notifier // new Transaction("Remove Node Notifier") { @Override protected void run() throws Rollback { ManagedObject.delete(notifier); } }.execute(); } private static Notifier initialize() { return new Transaction("Initialize") { @Override protected void run() throws Transaction.Rollback { PartitionManager.setMapper(NodeObject.class, new NodeObjectMapper()); PartitionManager.definePartition(m_nodeName, null, m_nodeName, null); PartitionManager.enablePartitions(JOIN_CLUSTER); m_notifier = new Notifier(); } Notifier initialize() { execute(); return m_notifier; } private Notifier m_notifier = null; }.initialize(); } // // Wait for operator to shutdown example // private static void waitForOperator() throws InterruptedException { while (true) { Thread.sleep(5000); new Transaction("Wait for Stop") { @Override protected void run() { callRemoteNodes(); } }.execute(); System.out.println(m_nodeName + ": Waiting for stop"); } } private static void callRemoteNodes() { for (NodeObject no : ManagedObject.extent(NodeObject.class)) { if (no.nodeName.equals(m_nodeName) == false) { no.remoteExecute(m_nodeName); } } } private static final String m_nodeName = System.getProperty(Status.NODE_NAME); }
When Example 7.14, “Dynamically maintaining cluster-wide partitions” is run it outputs the (annotation added) information in Example 7.15, “Dynamic partitions output”.
Example 7.15. Dynamic partitions output
# # B & C are active on node A # [A] A: B active [A] A: C active # # A & B are active on node C # [C] C: A active [C] C: B active # # A & C are active on node B # [B] B: A active [B] B: C active # # Node A executed remote methods on C & B # [C] remoteExecute executed on node C, caller node was A. [B] remoteExecute executed on node B, caller node was A. # # Node C executed remote methods on A & B # [B] remoteExecute executed on node B, caller node was C. [A] remoteExecute executed on node A, caller node was C. # # Node B executed remote methods on A & C # [C] remoteExecute executed on node C, caller node was B. [A] remoteExecute executed on node A, caller node was B. ...