Cluster Topic Publisher and Subscriber Adapters

Introduction

Note

Starting with release 10.4.1, this adapter suite is deprecated.

The Spotfire Streaming Cluster Publisher and Subscriber Adapters allow StreamBase applications, which are deployed on different nodes of a cluster to publish tuples to a given topic and allow the other nodes to access this tuple by subscribing to the same topic. In addition, a history of the published tuples is kept so that subscribers may request a replay of older tuples, for example to recover after a node failover.

The Cluster Publisher operator has the following capabilities:

  • On connect, it creates the topic that is named in its Topic Name property, or if the topic exists in the cluster, it connects to it.

  • Tuples sent to its input port are published to the topic, and are then forwarded to all the topic's subscribers throughout the cluster.

  • Each tuple published in this way is given a unique, sequential ordinal number to indicate the order in which it was published.

  • More than one publisher from any node in the cluster may be used for the same topic. In that case, tuples are published in first-come, first-served order.

  • A command can be sent to any publisher of a topic to clear this topic's history.

The Cluster Subscriber operator has the following capabilities:

  • On connect, it creates the topic named in its Topic Name property, or if the topic exists already in the cluster, it connects to it.

  • When publishers send tuples to the topic, the subscriber receives them and emits them on its output port. Each tuple received this way contains an integer field named ordinal to indicate its publishing sequence order.

  • More than one operator instance from any node in the cluster may subscribe to the same topic.

  • When you send a Replay command to its input port with a given range of ordinal numbers, it replays the tuples that fell within the range and were previously sent to the topic. In this way, subscribers may bring themselves up to date when starting up in an existing cluster, for example when restarting the node after a fail-over event or when adding new nodes to the cluster. Each tuple replayed in this way has a boolean field named replay set to true.

Subscriber and Publisher Adapter Properties

Settings are used to control most of the behavior of the operators. The settings are grouped under several tabs in the Properties view in StreamBase Studio.

In the tables in this section, the Property column displays each property name as it appears in one or more operator property tabs of the Properties view for this adapter.

Both the Publisher and Subscriber operators expose the same set of properties (unless noted otherwise). The properties are described in this section.

General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Property Data Type Default Description
Cluster Configuration Edit Button Not applicable Shortcut to the StreamBase Configuration File Editor, used for adapter configuration or converting an existing application's adapter-configurations.xml file to HOCON format.
Topic Name string none (Required) The name of the topic to which to subscribe (for the Subscriber operator) or publish (for the Publisher operator). If the topic does not exist in the cluster at the time the operator starts, it is created.
Availability Zone string default-cluster-wide-availability-zone (Required) The Availability Zone.
Connect On Startup boolean true When enabled the adapter connects to the given topic on startup.
Enable Command Port boolean false Whether to add an input port to receive commands. If enabled, the schema for this port is expected to have the following fields:
  • command (string), required: Should be set to the command to run (see below for valid commands)

  • (Subscriber Only) begin (long), optional: If present and non-null, indicates the first ordinal to replay. Otherwise replay starts with the oldest tuple

  • (Subscriber Only) end (long), optional: If present and non-null, indicates the last ordinal to replay (inclusively). Otherwise the replay ends with the newest tuple

The Subscriber accepts commands:

  • CONNECT connects to the topic and registers as a subscriber

  • DISCONNECT unregisters from the topic and disconnects

  • REPLAY You may use the begin and end fields to specify the ordinals of the oldest and newest tuples to include in the replay, or omit those fields (or leave them null) to replay the entire history. Tuples emitted as part of the replay has their replay field set to TRUE. Once all requested tuples have been replayed, a marker tuple with ordinal set to 0 and replay set to TRUE is emitted to indicate the end.

Enable Publish Complete Port

(Publisher operator only)

boolean false If enabled, an output port is enabled, which outputs all tuples that have been successfully published.
Asynchronous Publish

(Publisher operator only)

boolean true If enabled, all publish calls to subscribers are done asynchronously.
Publisher Flush Interval (ms)

(Publisher operator only)

int 100 The number of milliseconds to hold tuples before doing a write to transactional memory. A value of 0 means using a low latency connection. A value greater than 0 indicates that batching occurs. However, it can cause data loss if the batch is not committed to transactional memory before a failure.
Publisher Buffer Size

(Publisher operator only)

int 100 The number of tuples to hold before doing a write to transactional memory.
Max Number of Tuples in History

(Publisher operator only)

long 10000 Number of tuples to keep in the topic's history queue (this history is used by subscriber operators when they need to replay tuples). When the maximum number of tuples is reached, older tuples are dropped as new ones are published. If multiple publishers use the same topic and use a different value for this setting, the highest value is used.
History Threshold Percent

(Publisher operator only)

int 10 The percentage of the history size to wait before performing a flush. Valid values are 0 to 100. If the value is 0, a flush of each tuple or batch of tuples is done per tuple or batch that gets published.
Log Level drop-down list INFO Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Edit Schema Tab (Subscriber operator only)

Use this tab to define the schema for the tuples to be read from the topic.

Note that the Subscriber operator automatically adds the following fields to this schema on its output port (and populate those fields appropriately for each arriving tuple), to convey some meta-information about the payload:

  • ordinal (long)—Contains the sequence number of this tuple, which indicates the order in which each tuple was published. Subscriber operator users should expect this ordinal number to be monotonically increasing as new tuples are read from the topic. If the EventFlow logic downstream of the Subscriber operator detects a gap in this sequence, a replay may be requested by sending a Replay command to the Subscriber's command port.

  • replay (boolean)—Set to false by default, but becomes true when the tuple is emitted as a result of processing a Replay command.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Operator Ports And Schemas

Publisher Operator Ports

Publish Data Port

By default the Publisher operator has only one input port, to receive tuples to publish to the topic. Tuples sent to this port are forwarded to the topic verbatim.

Note

The tuples sent to this port may not contain fields with the reserved names ordinal or isReplay.

Command Port

If the Enable Command Input Port property is checked, one additional port is present to receive commands for the operator. This port is expected to receive tuples with the following fields:

Field Name Field Type Field Description
command string Required. Contains the type of command to run, the following are valid commands:
  • CONNECT connects to the topic and sets the topic history size and threshold

  • DISCONNECT disconnects from the topic

  • CLEAR purges all tuples from the topics history

  • TOPICDETAILS—Outputs a status tuple with topic details see status port for details

Status Port

By default the Publisher operator has a status port with the following schema:

Field Name Field Type Field Description
Status string Contains the type of status information contained in this tuple, (for example, Error).
Time timestamp The time the status occurred.
Info list<tuple<String Name, String Value>> Contains one or more tuple name value pairs of text describing the event that triggered this tuple.

The following is a list of all possible status messages and the name value pair values that are output by the publisher:

Status Description Name Value Pairs
Topic Details Output when a TOPICDETAILS command is issued
  • Topic—The current topic.

  • HistorySize—The current max allowed history size.

  • HistoryThresholdPercent—The current history threshold percent.

  • FirstOrdinal—The topics current first ordinal in history (the smallest ordinal you can request).

  • Ordinal—The topics current ordinal (last message published).

  • NumberSubscribers—The topics current number of subscribers.

  • AsynchronousPublish—If the current publisher is sending to subscriber asynchronously.

Connect Error Output when a connection error occurred
  • Error—A human readable error message.

  • Topic—The current topic.

Connected Output when a publisher is connected to a topic
  • Topic—The current topic.

Disconnected Output when a publisher is disconnected from a topic
  • Topic—The current topic.

Command Error Output when an input tuple command is invalid
  • Error—A human readable error message.

  • Topic—The current topic.

Publish Error Output when there is an error trying to publish. If the flush interval is greater than 0 this status may occur asynchronously, when a batch of tuples gets published to the topic.
  • Error—A human readable error message.

  • Topic—The current topic.

Publish Complete Error Output when using batch tuple processing is used and an error occurred when trying to output a publish complete tuple
  • Error—A human readable error message.

  • Topic—The current topic.

Clear Cache Complete Output after a CLEAR command has been received and the cache has been cleared
  • Topic—The current topic.

Clear Cache Error Output when there is an error clearing the cache
  • Error—A human readable error message.

  • Topic—The current topic.

Publish Complete Port

If Enable Publish Complete Port is enabled, a port exposes itself, which outputs each tuple that was published to the topic when the tuple is committed to transactional memory.

Subscriber Operator Ports

Command Port

The Subscriber operator has at most one input port, to receive commands for the operator. This port is expected to receive tuples with the following fields:

Field Name Field Type Field Description
command string Required. Contains the type of command to run, the following are valid commands:
  • CONNECT connects to the topic and sets the topic history size and threshold

  • DISCONNECT disconnects from the topic

  • REPLAY replays history tuples with the given range from begin to end

  • TOPICDETAILS—Outputs a status tuple with topic details see status port for details

begin long Optional. The ordinal of the first tuple to replay. Can be left null to signify the first tuple in the topic's queue.
end long Optional. The ordinal of the last tuple to replay. Can be left null to signify the latest tuple in the topic's queue.

Status Port

By default the Subscriber operator has a status port with the following schema:

Field Name Field Type Field Description
Status string Contains the type of status information contained in this tuple, (for example, Error).
Time timestamp The time the status occurred.
Info list<tuple<String Name, String Value>> Contains one or more tuple name value pairs of text describing the event that triggered this tuple.

The following is a list of all possible status messages and the name value pair values that are output by the subscriber:

Status Description Name Value Pairs
Topic Details Output when a TOPICDETAILS command is issued
  • Topic—The current topic.

  • HistorySize—The current max allowed history size.

  • HistoryThresholdPercent—The current history threshold percent.

  • FirstOrdinal—The topics current first ordinal in history (the smallest ordinal you can request).

  • Ordinal—The topics current ordinal (last message published).

  • NumberSubscribers—The topics current number of subscribers.

  • AsynchronousPublish—If the current publisher is sending to subscriber asynchronously.

Connect Error Output when a connection error occurred
  • Error—A human readable error message.

  • Topic—The current topic.

Connected Output when a publisher is connected to a topic
  • Topic—The current topic.

Disconnected Output when a publisher is disconnected from a topic
  • Topic—The current topic.

Disconnect Error Output when an error occurred while trying to disconnect
  • Error—A human readable error message.

  • Topic—The current topic.

Command Error Output when an input tuple command is invalid
  • Error—A human readable error message.

  • Topic—The current topic.

Receive Error Output when there is an error trying to output a received tuple from the topic.
  • Error—A human readable error message.

  • Topic—The current topic.

Replay Success Output after a replay command has been completed successfully
  • Topic—The current topic.

Replay Error Output when there is an error during a replay command
  • Error—A human readable error message.

  • Topic—The current topic.

Data Port

Receives tuples retrieved from the topic. The schema of the data port is the same as that specified in the Edit Schema tab of the operator's Properties view, but with the following two fields added to convey additional information about the tuple:

  • ordinal (long)—Contains the sequence number of this tuple, which indicates the order in which each tuple was published. Subscriber operator users should expect this ordinal number to be monotonically increasing as new tuples are read from the topic. If the EventFlow logic downstream of the Subscriber operator detects a gap in this sequence, a replay may be requested by sending a Replay command to the Subscriber's command port.

  • replay (boolean)—Set to false by default, but is true when the tuple is emitted as a result of processing a Replay command.

Typechecking and Error Handling

The operators use typecheck messages to help you configure them in your StreamBase application. In particular, the operators generate typecheck messages for the following reasons:

  • A required property is missing.

  • One or more required fields in the Command input port are missing or is of the wrong type or size.

Suspend and Resume Behavior

When suspended, the Publisher operator no longer sends tuples to the topic (that is, tuples sent to its input port will be dropped) and the Subscriber no longer dequeues them and also ignores commands sent to its input port. For both operators, the connection to the topic remains intact.

When resumed, the Publisher operator again starts to send tuples to the topic, and the Subscriber resumes dequeuing them.