Cluster Topic Publisher and Subscriber Adapters

Introduction

The TIBCO StreamBase® Cluster Publisher and Subscriber Adapters allow StreamBase applications deployed on different nodes of a cluster to publish tuples to a given topic and allow the other nodes to access this tuple by subscribing to the same topic. In addition, a history of the published tuples is kept so that subscribers may request a replay of older tuples, for example to recover after a node failover.

The Cluster Publisher operator has the following capabilities:

  • At startup time, it creates the topic named in its Topic Name property, or if the topic exists already in the cluster, it connects to it.

  • Tuples sent to its input port are published to the topic, and are then forwarded to all the topic's subscribers throughout the cluster.

  • Each tuple published in this way is given a unique, sequential ordinal number to indicate the order in which it was published.

  • More than one publisher from any node in the cluster may be used for the same topic. In that case, tuples are published in first-come, first-served order.

  • A command can be sent to any publisher of a topic to clear this topic's history.

The Cluster Subscriber operator has the following capabilities:

  • At startup time, it creates the topic named in its Topic Name property, or if the topic exists already in the cluster, it connects to it.

  • When publishers send tuples to the topic, the subscriber will receive them and emit them on its output port. Each tuple received this way contains an integer field named ordinal to indicate its publishing sequence order.

  • More than one operator instance from any node in the cluster may subscribe to the same topic.

  • When a Replay command is sent to its input port with a given range of ordinal numbers, the tuples previously sent to the topic that fall within the range are replayed. In this way, subscribers may bring themselves up to date when starting up in an existing cluster, for example when restarting the node after a failover event or when adding new nodes to the cluster. Each tuple replayed in this way will have a boolean field named replay set to true.

Subscriber and Publisher Adapter Properties

Settings are used to control most of the behavior of the operators. The settings are grouped under several tabs in the Properties view in StreamBase Studio.

In the tables in this section, the Property column shows each property name as found in the one or more operator properties tabs of the Properties view for this adapter.

When using this adapter in a StreamSQL program with the APPLY JAVA statement, you must convert the Studio property names to parameter names using the simple formula described in APPLY Statement. For cases where the Studio property name is not convertible in an obvious way, the StreamSQL parameter name is included in the Description column.

Both the Publisher and Subscriber operators expose the same set of properties (unless noted otherwise). The properties are described in this section.

General Tab

Name: Use this required field to specify or change the component's name, which must be unique in the current module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Class: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this operator starts as part of the JVM engine that runs this EventFlow fragment. If this field is set to No or to a module parameter that evaluates to false, the operator instance is loaded with the engine, but does not start until you send an epadmin container resume command (or its sbadmin equivalent), or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Property Data Type Default Description
Topic Name string none (Required) The name of the topic to which to subscribe (for the Subscriber operator) or publish (for the Publisher operator). If the topic doesn't exist in the cluster at the time the operator starts, it is created.

(StreamSQL property name: topicName)

Max Number of Tuples in History

long 10000 Number of tuples to keep in the topic's history queue (this history is used by Subscriber operators when they need to replay tuples). When the maximum number of tuples is reached, older tuples are dropped as new ones are published. If multiple Publishers use the same topic and use a different value for this setting, the highest value is used.

(StreamSQL property name: historySize)

Enable Command Port

boolean false Whether to add an input port to receive commands. If enabled, the schema for this port is expected to have the following fields:
  • command (string), required: Should be set to the command to execute (see below for valid commands)

  • begin (long), optional: If present and non-null, indicates the first ordinal to replay. Otherwise 0 is used (that is, replay will start with the oldest tuple)

  • end (long), optional: If present and non-null, indicates the last ordinal to replay (inclusively). Otherwise Long.MAX_VALUE is used (that is, replay will end with the newest tuple)

The Publisher accepts only one command, CLEAR, which purges all tuples from the topic's history.

The Subscriber accepts only one command, REPLAY, which causes the requested tuples to be replayed from the topic's history. You may use the begin and end fields to specify the ordinals of the oldest and newest tuples to include in the replay, or omit those fields (or leave them null) to replay the entire history. Tuples emitted as part of the replay will have their replay field set to TRUE. Once all requested tuples have been replayed, a marker tuple with ordinal set to 0 and replay set to TRUE is emitted to indicate the end.

(StreamSQL property name: enableCommandPort)

Enable Status Port boolean false Whether to add an output port that emits status tuples.

(StreamSQL property name: enableStatusPort)

Log Level drop-down list INFO Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Edit Schema Tab (Subscriber operator only)

Use this tab to define the schema for the tuples to be read from the topic.

Note that the Subscriber operator will automatically add the following fields to this schema on its output port (and populate those fields appropriately for each arriving tuple), to convey some meta-information about the payload:

  • ordinal (long) — Contains the sequence number of this tuple, which indicates the order in which each tuple was published. Subscriber operator users should expect this ordinal number to be monotonically increasing as new tuples are read from the topic. If the EventFlow logic downstream of the Subscriber operator detects a gap in this sequence, a replay may be requested by sending a Replay command to the Subscriber's command port.

  • replay (boolean) — Set to false by default, but will be true when the tuple is emitted as a result of processing a Replay command.

(StreamSQL property name: payloadSchema)

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Operator Ports And Schemas

Publisher Operator Ports

Input Port

By default the Publisher operator has only one input port, to receive tuples to publish to the topic. Tuples sent to this port will be forwarded to the topic verbatim.

Note

The tuples sent to this port may not contain fields with the reserved names ordinal or isReplay.

If the Enable Command Input Port property is checked, one additional port will be present to receive commands for the operator. Currently only one command, Clear, is supported. This port is expected to receive tuples with the following fields:

Field Name Field Type Field Description
command string Required. Contains the type of command to execute. Currently, only the clear command is supported. When received, the topic's history is purged of all tuples.

Output Ports

By default the Publisher operator has no output ports; however, if its Enable Status Port property is checked then a status port will be added with the following schema:

Field Name Field Type Field Description
status string Contains the type of status information contained in this tuple (for example, Error).
info list<string> Contains one or more lines of text describing the event that triggered this tuple.

Subscriber Operator Ports

Input Port

The Subscriber operator has at most one input port, to receive commands for the operator. Currently, only the Replay command is supported. This port is expected to receive tuples with the following fields:

Field Name Field Type Field Description
command string Required. Contains the type of command to execute. Currently, only the Replay command is supported. When received, the adapter replays tuples contained in the topic's history, followed by a marker tuple with its ordinal field set to 0 to indicate the end of replay.
begin long Optional. The ordinal of the first tuple to replay. Can be left null to signify first tuple in the topic's queue.
end long Optional. The ordinal of the last tuple to replay. Can be left null to signify latest tuple in the topic's queue.

Output Ports

By default the Subscriber operator has one output port, to receive tuples retrieved from the topic. The schema of this port will be the same as that specified in the Edit Schema tab of the operator's Properties view, but with the following two fields added to convey additional information about the tuple:

  • ordinal (long) — Contains the sequence number of this tuple, which indicates the order in which each tuple was published. Subscriber operator users should expect this ordinal number to be monotonically increasing as new tuples are read from the topic. If the EventFlow logic downstream of the Subscriber operator detects a gap in this sequence, a replay may be requested by sending a Replay command to the Subscriber's command port.

  • replay (boolean) — Set to false by default, but is true when the tuple is emitted as a result of processing a Replay command.

If the operator's Enable Status Port property is checked then a status port is also added, with the following schema:

Field Name Field Type Field Description
status string Contains the type of status information contained in this tuple, (for example, Error).
info list<string> Contains one or more lines of text describing the event that triggered this tuple.

Typechecking and Error Handling

The operators use typecheck messages to help you configure them in your StreamBase application. In particular, the operators generate typecheck messages for the following reasons:

  • A required property is missing.

  • One or more required fields in the Command input port is missing or is of the wrong type or size.

Suspend and Resume Behavior

When suspended, the Publisher operator no longer sends tuples to the topic (that is, tuples sent to its input port will be dropped) and the Subscriber no longer dequeues them and also ignores commands sent to its input port. For both operators the connection to the topic remains intact.

When resumed, the Publisher operator again starts to send tuples to the topic and the Subscriber resumes dequeuing them.