Interactive Data PlusFeed Input Adapter

This adapter is part of the TIBCO StreamBase CEP Connectivity Package.

Introduction

The TIBCO StreamBase® Adapter for PlusFeed from Interactive Data (formerly known as the Comstock Input Adapter) allows a StreamBase application to retrieve market data from a PlusFeed Client Site Processor (CSP) using the Comstock Token Format (CTF). The adapter is embedded in the StreamBase application and provides access to equity, derivative, and market depth information from one or more of approximately 400 exchanges.

The adapter emits market data tuples on one of its two primary output ports. Snapshot tuples, containing a symbol's full image, are sent to the SnapshotOut port, while update tuples, containing non-null data in only those fields that have changed, are sent to the UpdateOut port. The StreamBase application author determines the set of fields present in each market data tuple. StreamBase market data field names must match the corresponding PlusFeed names, with the exception that dots in PlusFeed names are replaced with underscores in StreamBase names. The set of available PlusFeed field names can be retrieved through an information query to the adapter.

The adapter supports both static and dynamic subscriptions. Static (initial) subscriptions are provided to the adapter through a text file, which is processed when the adapter starts. Later, symbols can be subscribed to, or unsubscribed from, by enqueuing tuples to the adapter's dynamic subscription input port.

The adapter provides a third output port to convey information about important events to the StreamBase application. For example, when the state of a connection to the CSP changes, a connection event tuple is emitted from the event port.

The adapter provides an input and output port for querying metadata, including the set of PlusFeed error codes, the set of PlusFeed tokens (fields), the subset of tokens selected by the user, the set of PlusFeed Source IDs (exchanges), and the set of active subscriptions.

The metadata input and output ports ports are also used to retrieve correction, interval, TAS (time and sales), and historical information from a PlusTick server.

Subscriptions can be submitted for a specific symbol on a specific exchange, for all symbols on a specific exchange, or for all symbols on all exchanges. Snapshots of a symbol's current state can be retrieved, as well as depth and derivative information. Certain combinations of subscription options are disallowed. For example, a snapshot may not be requested when subscribing to all symbols on all exchanges. In addition, attempts to request depth and derivative information with a single subscribe request are rejected.

The adapter uses a background thread to monitor its connection with the CSP and to reconnect after a connection is lost. After reconnecting, the adapter restores the subscriptions that were active at the time the connection was lost.

The adapter is configured through a collection of properties set in the adapter's Properties view within StreamBase Studio. Properties specify, among other things, the host name or IP address of the CSP, its port number, and the username and password used to access it.

Adapter Properties

Property Description
Host The host name or IP address of the PlusFeed Client Site Processor (CSP).
Port The TCP port number of the CSP.
Username The username to log in to the CSP.
Password The password to log in to the CSP.
Reconnect Interval The time, in seconds, to wait between attempts to reconnect to the CSP after the connection to it is lost. After reconnecting, the adapter restores the subscriptions that were active at the time the connection was lost.
Token File The name of a text file containing the full set of PlusFeed tokens. If blank, the adapter retrieves token information from the CSP during initialization. Providing a token file streamlines initialization.
Error Code File The name of a text file containing the numeric value and description of each PlusFeed error code. If blank, the adapter retrieves the error codes from the CSP during initialization. Providing an error code file streamlines initialization.
Source ID File The name of a text file containing the numeric value and description of each Comstock source. If blank, the adapter retrieves the source IDs from the CSP during initialization. Providing a source ID file streamlines initialization.
Initial Subscription File The name of a text file containing subscriptions to be processed during adapter startup. An example initial subscription file is shipped with the PlusFeed sample.
Send Unchanged Fields As Null Determines the action taken when a field of interest is not present in a market data message. If set (the default), a null value is placed in the corresponding StreamBase field. Otherwise, the value of the field from the previous tuple matching the symbol and exchange is placed in the field.
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.
Query Corrections When set, allows QueryCorrections commands to be processed by the adapter and includes a QueryCorrections tuple field in the adapter's information query output port to receive the results of those commands.
Query Interval When set, allows QueryInterval commands to be processed by the adapter and includes a QueryInterval tuple field in the adapter's information query output port to receive the results of those commands.
Query Interval Dates When set, allows QueryIntervalDates commands to be processed by the adapter and includes a QueryIntervalDates tuple field in the adapter's information query output port to receive the results of those commands.
Query Tas When set, allows QueryTas (time and sales) commands to be processed by the adapter and includes a QueryTas tuple field in the adapter's information query output port to receive the results of those commands.
Query Tas Dates When set, allows QueryTasDates commands to be processed by the adapter and includes a QueryTasDates tuple field in the adapter's information query output port to receive the results of those commands.
Query History When set, allows QueryHistory commands to be processed by the adapter and includes a QueryHistory tuple field in the adapter's information query output port to receive the results of those commands.
Display Messages Sent A debugging feature that, if set, displays in the console the contents of each PlusFeed message sent by the adapter.
Display Messages Received A debugging feature that, if set, displays in the console the contents of each PlusFeed message received by the adapter.
Verbose Message Display A debugging feature that, if set, shows the raw bytes of each displayed sent or received message.
Message Display Filter A debugging feature that contains a string used to reduce the set of sent or received message displayed. Only those messages whose commands match the filter are displayed. For example, a filter of login|snap causes only sent and received login and snapshot messages to be displayed.

Using the Adapter in a StreamBase Application

This section demonstrates how to use the Interactive Data PlusFeed adapter within a StreamBase application. As shown in the diagram below depicting the adapter's sample application, the adapter uses two input ports and three output ports to communicate with the surrounding application.

Note

Although in the sample application the adapter's input and output ports are connected directly to externally-visible input and output streams, in more complex applications these ports will typically be connected to internal StreamBase operators.

The Interactive Data PlusFeed adapter's ports are used as follows:

  • SubscribeIn: Tuples enqueued on this port cause the adapter to subscribe to, or unsubscribe from, symbols after the adapter has started. Initial subscriptions (those the adapter processes during startup) can later be cancelled using this port. The schema of the SubscribeIn port is derived from the upstream operator or stream and must have the following fields:

    • Subscribe, bool: if true, results in a new subscription. Note that the adapter treats bool fields containing null as false.

    • Snapshot, bool: if true, requests a snapshot. If Subscribe is also true, requests a snapshot followed by a series of market data updates. If both Subscribe and Snapshot are false, interpreted to be an unsubscribe request.

    • Derivatives, bool: if true, requests derivative information, such as options and warrants.

    • Depth, bool: if true, requests market depth information. Depth and Derivatives cannot both be set.

    • Symbol, string: specifies the symbol to subscribe to or unsubscribe from. If null or empty, all symbols on the specified exchange are included.

    • SourceID, int: specifies the numeric exchange code such as 558 for NYSE. If null, the specified symbol(s) on all exchanges are included.

    • DerivativesSourceID, int: when requesting derivative information, this optional parameter limits the response to derivatives from the specified exchange.

    • MarketMaker, string: when requesting depth information, this optional parameter limits the response to bids and asks from the specific market maker.

  • InfoQueryIn: Tuples enqueued on this port request metadata from the adapter and retrieve correction, interval, TAS (time and sales), and historical information from a PlusTick server. The resulting information is emitted as tuples on the InfoQueryOut port. The InfoQueryIn port has the following fields:

    • Command, string: accepts a command specifying the type of metadata being requested. Valid commands include:

      • ListErrorCodes: returns the numeric value and description of all PlusFeed error codes.

      • ListAvailableTokens: returns the numeric value, name, StreamBase field name, type, and size of all PlusFeed tokens (fields).

      • ListUserTokens: returns the numeric value, name, StreamBase field name, type, and size of all PlusFeed tokens currently selected by the user. During the initialization, the adapters requests the CSP to make available all PlusFeed tokens.

      • ListSourceIDs: returns the numeric code and description of all source IDs (exchanges).

      • ListCurrencies: returns the numeric code and description of all PlusFeed currencies.

      • ListSubscriptions: returns the current set of pending and active subscriptions.

      • QueryCorrections: retrieves corrections from the system for recent trade price and trade size data.

      • QueryInterval: retrieves data for a set of successive time intervals for a selected instrument on a selected exchange.

      • QueryIntervalDates: retrieves a list of all the dates currently held in the Interval database.

      • QueryTas: retrieves time and sales records from the snapshot database.

      • QueryTasDates: retrieves a list of all dates currently held in the time and sales database.

      • QueryHistory: retrieves historical prices from historical price database.

    • Tag, string: this value is echoed in each tuple emitted from the information query output port in response to a command, allowing responses to be matched with commands.

      Note

      The following fields are used only when one or more PlusTick queries are enabled. Each query has a set of mandatory and optional fields. The adapter generates a typecheck exception when a mandatory field for an enabled PlusTick query is not present in the input schema. An unspecified optional field is conveyed in one of two ways: the field itself is not present in the input schema or is null in an information query tuple. Any of the fields below may be present in the input schema even when no queries that use it are enabled.

    • SourceID, string: contains the mandatory source ID used in the QueryCorrections, QueryInterval, QueryTas, and QueryHistory commands.

    • Symbol, string: contains the mandatory symbol used in the QueryCorrections, QueryInterval, QueryTas, and QueryHistory commands.

    • FromDateTime, timestamp: Contains an optional from date used in the QueryCorrections, QueryInterval, and QueryTas commands and the mandatory from date used in QueryHistory commands.

    • ToDateTime, timestamp: Contains an optional to date used in the QueryCorrections, QueryInterval, and QueryTas commands and the mandatory to date used in QueryHistory commands.

    • CorrectionType, string: Contains an optional correction type used in QueryTas commands. Valid values are CORRECTED, UNCORRECTED, and MARKED. The default value, if this field is absent in the schema or null in an input tuple is CORRECTED.

    • Interval, int: Contains an optional request interval, in minutes, used in QueryInterval commands.

    • PageNoOfTicksBack, int: Contains an optional count of the number of records to be retrieved by QueryCorrections, QueryInterval, and QueryTas commands.

    • PartCode, string: Contains an optional part code use in QueryCorrections and QueryTas commands.

    • PivotCount, int: Contains an optional number of records to be retrieved before and after the pivot date/time in QueryTas commands.

    • PivotDateTime, timestamp: Contains an optional date/time around which the number of records specified by the PivotCount lie. PivotCount and PivotDateTime must both be present when either is present.

    • RecordType, string: Contains an optional record type to be returned by QueryTas commands. Valid values are Q (quotes), B (BBOs), and T (trades).

  • SnapshotOut: Market data snapshot tuples are emitted from this port. The user determines the set of fields present in its schema with the adapter's Edit Schema tab. The set of available PlusFeed fields is available through the ListUserTokens information query command.

  • UpdateOut: Market data update tuples are emitted from this port. Its schema is derived from the Edit Schema tab and is identical to that of the SnapshotOut port.

  • EventOut: The adapter emits tuples from this port when significant events occur, such as when the state of the connection with the CSP changes. The schema for this port has the following fields:

    • Type, string: returns the type of event, such as Connection, Login, or Subscription.

    • Object, string: returns an event type-specific value, such as the CSP host and port when a connection event occurs, or the contents of the subscription fields when a subscription failure event occurs.

    • Action, string: returns an action associated with the event Type and Object, such as Up or Down for a Connection event, Rejected when a Login failure event occurs, or Failed when a subscription fails.

    • Status, int: returns a PlusFeed status code, if one is available for the particular event. For example, a subscription request may fail with a PlusFeed status code of -12 (ERR_AUTH_REQ), indicating the user does not have entitlement for the requested exchange.

    • Info, string: Returns a human-readable description of the event.

  • InfoQueryOut: This generic adapter output port emits tuples in response to all information query commands. The PlusFeed sample application uses a filter to split the resulting stream into a set of command-specific streams. The InfoQueryOut port has the following schema:

    • Done, bool: Set to false for all but the last tuple emitted in response to a specific command. The final (marker) tuple has Done set to true and all other fields set to null.

    • Command, string: returns the command value specified in the corresponding information query request tuple.

    • Tag, string: returns the tag value specified in the corresponding information query request tuple.

    • Info1-Info5, string: returns one or more command-specific values. The description of the InfoQueryIn stream above lists the information returned for each command.

      Note

      The following tuple fields appear in the InfoQueryOut schema only when the corresponding PlusTick queries are enabled.

    • QueryCorrections

      • CORRECTION_NEW_DELTA_TRADE_COUNT, int

      • CORRECTION_NEW_TRADE_PRICE, double

      • CORRECTION_NEW_TRADE_SIZE, int

      • CORRECTION_PREV_TRADE_PRICE, double

      • CORRECTION_PREV_TRADE_SIZE, int

    • QueryInterval

      • TAS_INTERVAL_TIME, timestamp

      • TAS_INTERVAL_OPEN, double

      • TAS_INTERVAL_HIGH, double

      • TAS_INTERVAL_LOW, double

      • TAS_INTERVAL_CLOSE, double

      • TAS_INTERVAL_VOL, int

      • TAS_INTERVAL_PARTIAL, int

    • QueryIntervalDates

      • ACTIVITY_DATETIME, list<timestamp>

    • QueryTas

      • PERMISSION, int

      • TRADE_PRICE, double

      • TRADE_SIZE, int

      • CORRECTION_PREV_TRADE_PRICE, double

      • CORRECTION_PREV_TRADE_SIZE, int

      • TAS_SEQ, int

    • QueryTasDates

      • ACTIVITY_DATETIME, list<timestamp>

    • QueryHistory

      • HISTORICAL_DATE, timestamp

      • HISTORICAL_CLOSE, double

      • HISTORICAL_OPEN, double

      • HISTORICAL_TRADE_HIGH, double

      • HISTORICAL_TRADE_LOW, double

      • HISTORICAL_VOL, int

Add an instance of the adapter to a new StreamBase application as follows:

  1. Within StreamBase Studio, create a project, including a StreamBase EventFlow diagram, which will host the adapter.

  2. Import into the project the token, error codes, and initial subscription files, plusfeed_tokens.txt, error_codes.txt, and initial_subscriptions.txt, which are located in streambase-install-dir/sample/adapter/embedded/interactive-data-plusfeed.

  3. Open the Global Adapters drawer of the Palette view in StreamBase Studio.

  4. Drag an instance of the Interactive Data PlusFeed adapter Global Adapter drawer to the canvas.

  5. Connect Input and Output streams to the adapter's two input and three output ports. Configure the schema of the input streams as defined in the descriptions of the SubscribeIn and InfoQueryIn ports above. The adapter informs you of any missing or extraneous fields in the two input ports.

  6. Double-click the adapter icon, and in the Properties view, select the Adapter Settings tab. Fill in the host or IP address, TCP port number, username, and password of the Client Site Processor. Alternatively, define these values as operation parameters in a server configuration file and reference them using the ${MyParam} syntax from the adapter settings tab. (The adapter's sample uses this latter technique.) Select the token, error code, and initial subscription files from the respective drop-downs.

  7. Select the Edit Schema tab and add the PlusFeed fields you would like to have appear in the market data output stream. The set of available fields can be retrieved by starting the adapter and sending a ListUserTokens command to the information query input port.

Typechecking and Error Handling

The PlusFeed adapter uses typecheck messages to help you configure the adapter within your StreamBase application. In particular, the adapter generates typecheck messages if the CSP host, port, username, or password are missing or invalid or when one or more required fields in the two input schemas is missing or is of the wrong type or size.

The adapter generates warning and error messages during runtime under various conditions, including:

  • A field in the market data output port's schema is not recognized as a valid PlusFeed token.

  • A line in the initial subscription file is invalid.

  • A tuple received on the dynamic subscription port contains incompatible parameters, such as a snapshot request for all symbols on all exchanges, or a request for both the Depth and Derivative information.

  • A tuple received on the information query input port contains an unrecognized Command.

  • A tuple received on the information query input port contains a null Tag value.

  • A field from a market data message is truncated to fit in an output tuple's string field. To avoid overrunning log files, truncation warnings are generated only once per tuple field.

  • An invalid or malformed PlusFeed message is received from the CSP.

Suspend/Resume Behavior

When suspended, the PlusFeed adapter closes its connection to the CSP.

When resumed, the adapter reopens a connection to the CSP and re-subscribes to the subscriptions that were pending or active at the time of the suspension.

Related Topics