Distributed Computing in Depth

Any StreamBase Managed Object can be a distributed object. A distributed object transparently provides remote method invocation and access to object fields across nodes. The full transactional guarantees StreamBase makes for non-distributed objects are also true for distributed objects.

Access to a distributed object is through a normal Java object reference. All Managed Object references contain data to identify the node where the object was created.

The same instance of an object cannot exist on multiple nodes. Copies of an object's state may be located on multiple nodes to improve performance or robustness, but the master copy is located on a single node — by default the node where the object was created.

All object methods transparently execute on the master node for an object. Any methods invoked on an object reference are sent to the master node and executed there.

Objects of the same type can be created on multiple nodes. This is done by installing the application class files, or implementation, on multiple nodes. This is a common application architecture to support object partitioning and caching or service availability mechanisms.

Distributed method execution


Figure 1, “Distributed method execution” shows an Order class that has its implementation installed on two nodes — Node One and Node Two. Two instances of the Order class are created, one on Node One and one on Node Two. When the Order.cancel() method executes on Node One, using the order(Node Two) instance, the method executes on Node Two. The opposite is true for the order(Node One) instance.

Connectivity

The distribution protocol uses either TCP/IP, SSL, or InfiniBand connectivity between nodes with a platform independent encoding. The platform-independent encoding allows heterogeneous hardware platforms to communicate with each in a distributed transactional system. The optional automatic node discovery protocol uses UDP.

Location Transparency

StreamBase provides location transparency for objects. This means that when an application accesses an object, its location is transparent — it may be on the local or remote node.

Location transparency is accomplished through the use of distributed references. All Managed Objects created in StreamBase have a distributed reference that contains the master node for the object. An object's identity, as defined by its distributed reference, does not change through-out the lifetime of the object.

Methods invoked on an object are always executed on the master node for an object.

Reading and Writing Object Fields

Object field data is transparently read from and written to the master node when fields are accessed on a local node.

Read operations are dispatched to the master node to read field data depending on whether the local node has the data cached locally or not. If the field data is not available on the local node, a distributed read is done when a field is accessed. The read completes before the get of the field returns to the caller. All reads are done on the master node in the same transaction in which the field access occurs.

When a field associated with a remote object is modified on a local node, by default, the update is deferred until the local transaction enters the prepare state. This is called deferred writes. See Deferred Write Protocol for details.

Extents

When an extent is accessed using a local query, only object references on the local node are returned — no read is dispatched to any remote nodes. References are in a local extent either because the object was created on the local node, it was returned in a method call, or it was pushed to the local node as part of object replication. Distributed queries can be used to access the global extent of all objects.

Locations

Every node is uniquely identified by a:

  • Cluster-unique name

  • Cluster-unique location code

  • Cluster-unique shared memory timestamp

The default node name is set to the local host name. The default node name can be changed during node installation. This allows multiple StreamBase nodes to run on the same machine.

The location code is automatically derived from the node name using a hashing algorithm.

The location code is a numeric identifier that determines the actual network location of the master node for an object. The location code is stored with each Managed Object. The initial value of the location code for an object is the location code of the node on which the object was created.

Highly available objects can migrate to other nodes as part of failover, or to support load balancing. When object migration occurs, the location code associated with all of the migrated objects is updated to use the location code of the node to which they were migrated. This update occurs on all nodes on which the objects exist. After the completion of an object migration, the new master node for the object is the new node, which may be different than the node on which the object was created.

The shared memory timestamp is assigned when the shared memory is first created for a node. This occurs the first time a node is started following an installation. The shared memory timestamp is a component of the opaque distributed reference. It ensures that the distributed reference is globally unique.

Location Discovery

Location discovery provides runtime mapping between location codes, or node names, and network addresses. This is called location discovery.

Location discovery in done two ways:

  • Static discovery using configuration information.

  • Dynamic discovery using service discovery.

Configuration can be used to define the mapping between a node name and a network address. Configuring this mapping is allowed at any time, but it is only required if service discovery cannot be used for location discovery. An example of when this would be necessary is if a remote node is across a wide area network where service discovery is not allowed. This is called static discovery.

If configuration information is not provided for a location name, service discovery performs the location discovery. This has the advantage that no configuration for remote nodes is required on the local node — it is all discovered at runtime. This is called dynamic discovery.

Note

When a network address is discovered with both static and dynamic discovery, the configured static discovery information is used.

StreamBase performs location discovery in the following cases:

  • A create of an object in a partition with a remote active node.

  • A method or field is set on a remote object.

When an object is associated with a partition whose active node is remote, a location discovery request is done by node name to locate the network information associated with the node name.

When an operation is dispatched on a remote object, a location discovery request is done by location code to locate the network information associated with a location code.

Location code information is cached on the local node once it the node is discovered.

Lifecycle

Initialization and termination of the distribution services are tied to activation and deactivation of distribution configuration data. A node without active distribution configuration cannot provide distributed services to a cluster. When distribution configuration is activated, StreamBase takes the following steps to initialize distribution:

  1. Mark the local node state as starting.

  2. Start dynamic discovery service if enabled.

  3. Start network listeners.

  4. Start keep-alive server.

  5. Mark the local node state as active.

After initialization completes, the node automatically becomes part of the cluster. The node can now provide access to distributed objects or provide high-availability services to other nodes in the cluster.

Remote Node States

Remote nodes can have one of the states in Remote node states.

Remote node states

State Description
Undiscovered Node cannot be discovered. Network address information is not available from this remote node. Remote node is unavailable.
Discovered The network address information for this node is discovered, either using dynamic or static discovery, but no connection could be established to the node. Remote node is unavailable.
In Up Notifier Node is transitioning to an Up state. This is a transitory state. Any installed node available notifiers are being executed.
Up Active connections are available to this node. Remote node is active.
In Down Notifier Node is transitioning to the Down state. This is a transitory state. Any installed node unavailable notifiers are being executed.
Down Node is inactive. No connections are active to this node, and new connection attempts fail with an error. Remote node is unavailable.
Duplicate Location A duplicate location code was detected during connection establishment. No communication can occur with this node until this error is corrected. Remote node is unavailable.
Duplicate Timestamp A duplicate installation time-stamp was detected during connection establishment. No communication can occur with this node until this error is corrected. Remote node is unavailable.
Unsupported Protocol An unsupported protocol version was detected during connection establishment. No communication can occur with this node until this error is corrected. Remote node is unavailable.

Remote Node State Change Notifiers

Application installed node state change notifiers are called when a remote node transitions from active to unavailable and from unavailable to active. The In Up Notifier and In Down Notifier states defined in Remote node states are seen when a node notifier is called.

When a node state change notifier installs, it is guaranteed to be called for all active remote nodes already discovered by the local node. Node notifier execution is serialized for a specific remote node. A call to a notifier must complete before another notifier is called. For example, if a remote node becomes unavailable while an active notifier is being executed, the unavailable notifier is not called until the active notifier completes.

Node state change notifiers are called in a transaction.

Deferred Write Protocol

By default, all distributed object updates use a deferred write protocol. The deferred write protocol defers all network I/O until the commit phase of a transaction. This allows the batching of all of the object updates, and the prepare request, into a single network I/O for each node, improving network performance. The size of the network buffer used for the network I/O is controlled in the distribution configuration. See the StreamBase Administration Guide for details on distribution configuration.

The deferred write protocol is shown in Figure 2, “Deferred write protocol” for two nodes.

Deferred write protocol


Notice that no transaction locks are taken on node B as distributed objects are modified on node A until the prepare step.

Note

Distributed object creates and deletes perform network I/O immediately; they are not deferred until commit time. There is no prepare phase enabled for these transactions. See Figure 1, “Distributed transaction”.

The deferred write protocol is disabled if a method call is done on a distributed object. Any modifications to the distributed object on the local node are flushed to the remote node before the method executes on the remote node. This ensures that any updates made on the local node are available on the remote node when the method executes.

After the method executes on the remote node, any modifications on the remote node are copied back to the initiating node. This ensures that the data is again consistent on the local node on which the method was originally executed.

You can disable the deferred write protocol in the high availability configuration, though TIBCO recommends enabling it. However, if an application only accesses object fields using accessors, instead of directly accessing fields, it is be more performant to disable the deferred write protocol since no modifications are ever done on the local node.

Detecting Failed Nodes

StreamBase supports keep-alive messages between all nodes in a cluster. Keep-alive requests actively determine whether a remote node is still reachable. Keep alive messages are sent to remote nodes using the configurable keepAliveSendIntervalSeconds time interval.

Figure 3, “Keep-alive protocol” shows how a node is detected as being down. Every time a keep-alive request is sent to a remote node, a timer starts with a duration of nonResponseTimeoutSeconds. This timer is reset when a keep-alive response is received from the remote node. If a keep-alive response is not received within the nonResponseTimeoutSeconds interval, a keep-alive request is sent on the next network interface configured for the node (if any). If there are no other network interfaces configured for the node, or the nonResponseTimeoutSeconds has expired on all configured interfaces, all connections to the remote node are dropped, and the remote node is marked Down.

Connection failures to remote nodes are also detected by the keep-alive protocol. When a connection failure is detected, as opposed to a keep-alive response not being received, the connection is reattempted to the remote node before trying the next configured network interface for the remote node (if any). This connection reattempt is done to transparently handle transient network connectivity failures without reporting a false node down event.

It is important to understand that the total time before a remote node is marked Down is the number of configured interfaces multiplied by the nonResponseTimeoutSeconds configuration value in the case of keep-alive responses not being received. In the case of connection failures, the total time could be twice the nonResponseTimeoutSeconds times the number of configured interfaces, if both connection attempts to the remote node (the initial one and the retry) hang attempting to connect with the remote node.

For example, in the case of keep-live responses not being received: if there are two network interfaces configured, and the nonResponseTimeoutSeconds value is four seconds, it will be eight seconds before the node is marked Down. In the case of connection establishment failures, where each connection attempt hangs, the total time would be sixteen seconds before the node is marked Down.

Keep-alive protocol


Network Error Handling

Distribution uses TCP as the underlying network protocol. In general, TCP provides reliable connectivity between machines on a network. However, it is possible that network errors can occur that cause a TCP connection to drop. When a TCP connection drops, requests and responses between nodes participating in a distributed transaction are not received. Network errors are detected by the keep-alive protocol described in Detecting Failed Nodes and handled by the distributed transaction protocol.

The following can cause network connectivity failures:

  • A non-response keep alive timeout occurring.

  • TCP retry timers expiring.

  • Lost routes to remote machines.

These errors are usually caused by network cables being disconnected, router crashes, or machine interfaces being disabled.

As discussed in Local and Distributed Transactions, all distributed transactions have a transaction initiator that acts as the transaction coordinator. The transaction initiator can detect network failures when sending a request, or reading a response from a remote node. When the transaction initiator detects a network failure, the transaction is rolled back. Other nodes in a distributed transaction can also detect network failures. When this happens, rollback is returned to the transaction initiator, and again the transaction initiator rolls back the transaction. This is shown in Figure 4, “Connection failure handling”.

Connection failure handling


When the transaction initiator performs a rollback because of a connection failure — either detected by the initiator or another node in the distributed transaction — the rollback is sent to all known nodes. Known nodes are those that were located using location discovery (see Location Discovery). This must be done because the initiator does not know which nodes are participating in the distributed transaction. Notice that a rollback is sent to all known nodes in Figure 4, “Connection failure handling”. The rollback is retried until network connectivity is restored to all nodes.

Transaction rollback is synchronized to ensure that the transaction is safely aborted on all participating nodes, no matter the current node state.

Distributed Transaction Failure Handling

Any communication failures to remote nodes detected during a global transaction before a commit sequence is started cause an exception that an application can handle (see the StreamBase Java Developer's Guide). This allows the application to explicitly decide whether to commit or rollback the current transaction. If the exception is not caught, the transaction is automatically rolled back.

Undetected communication failures to remote nodes do not affect the commit of the transaction. This failure scenario is shown in Figure 5, “Undetected communication failure”. In this case, Node 2 failed and restarted after all locks were taken on Node 2, but before the commit sequence was started by the transaction initiator — Node 1. Once the commit sequence starts it continues to completion. The request to commit is ignored on Node 2 because the transaction state was lost when Node 2 restarted.

Undetected communication failure


Transaction initiator node failures are handled transparently using a transaction outcome voting algorithm. There are two cases that must be handled:

  • Transaction initiator fails before commit sequence starts.

  • Transaction initiator fails during the commit sequence.

When a node participating in a distributed transaction detects the failure of a transaction initiator, it queries all other nodes for the outcome of the transaction. If the transaction was committed on any other participating nodes, the transaction is committed on the node that detected the node failure. If the transaction aborted on any other participating nodes, the transaction aborts on the node that detected the failure. If the transaction is still in progress on the other participating nodes, the transaction aborts on the node that detected the failure.

Transaction outcome voting before the commit sequence is shown in Figure 6, “Transaction initiator fails prior to initiating commit sequence”. In Figure 6, “Transaction initiator fails prior to initiating commit sequence” the initiating node, Node 1, fails before initiating the commit sequence. When Node 2 detects the failure, it performs the transaction outcome voting algorithm by querying other nodes in the cluster to see if they are participating in this transaction. Since there are no other nodes in this cluster, the Transaction Status request is a no-op and the transaction is immediately aborted on Node 2, releasing all locks held by the distributed transaction.

Transaction initiator fails prior to initiating commit sequence


Transaction outcome voting during a commit sequence is shown in Figure 7, “Transaction initiator fails during commit sequence”. In Figure 7, “Transaction initiator fails during commit sequence” the initiating node, Node 1, fails during the commit sequence after committing the transaction on Node 2, but before it is committed on Node 3. When Node 3 detects the failure, it performs the transaction outcome voting algorithm by querying Node 2 for the resolution of the global transaction. Since the transaction was committed on Node 2 it is committed on Node 3.

Transaction initiator fails during commit sequence


To support transaction outcome voting, each node maintains a history of all committed and aborted transactions for each remote node participating in a global transaction. The number of historical transactions to maintain is configurable and recommended to be based on the time for the longest running distributed transaction. For example, if 1000 transactions per second are being processed from a remote node, and the longest transaction on average is ten times longer than the mean, configure the transaction history buffer for 10,000 transactions.

For each transaction from each remote node, the following is captured:

  • Global transaction identifier

  • Node login time-stamp

  • Transaction resolution

The size of each transaction history record is 24 bytes.