ActivFeed Input Adapter

This adapter is part of the TIBCO StreamBase CEP Connectivity Package.

Introduction

The TIBCO StreamBase® Adapter for Activ Financial allows a StreamBase application to retrieve market data from an Activ Content Platform (ACP) via the Activ Content Gateway (ACG), using ActivFeed relationship subscriptions. The adapter uses ActivMiddleware facilities, is embedded within the StreamBase application, and provides access to equity, derivative, and index market data from many exchanges.

The adapter sends subscription requests received on its input port to the ACG and will asynchronously and continuously receive and convert the resulting response or market update messages into StreamBase tuples, then send those tuples downstream.

The adapter emits market data tuples on an output port. Market data tuples contain null data in unchanged fields (as received from Activ), unless the caching property is set. The StreamBase application author determines the set of fields present in each market data tuple. StreamBase market data field names must match the corresponding Activ names (example: BID). See below for more information on defining and typing these fields.

Example schema files for various relationship types are included; these can be imported into StreamBase Studio and edited. For scalability, the application developer should strive to minimize the set of fields if they expect high output rates, to reduce the application load and the need for additional application instances and/or more CPUs.

The adapter provides another output port to convey status and error information to the StreamBase application. For example, when the state of a connection to the ACG changes, a connection event tuple is emitted from the event port.

Subscriptions are submitted for a specific symbol on a specific exchange, or for relationships to an specific symbol and exchange such as an option chain. Other types of Activ requests, such as pattern-match bulk subscriptions, are not implemented.

The adapter is configured through a collection of properties set in the adapter's Properties view within StreamBase Studio. Properties specify, among other things, the ACG configuration file, the username and password (or file pathnames containing these).

Adapter Properties

Property Description
UserID

The username to log in to the ACG. This may instead be a file pathname that contains a one-line string for the userID. Using a file assists in security by not exposing the ID in the EventFlow (.sbapp) files.

You can also use a StreamBase parameter and then put the UserID in the sbd.sbconf file, such as: ${UID} for the field value and then in sbd.sbconf:

<operator-parameters> <operator-parameter name="UID" value="myActivUserIDorFilePath"/> </operator-parameters>
Password The password to log in to the ACG. This may instead be a file pathname that contains a one-line string for the password. Using a file assists in security by not exposing the password in the EventFlow (.sbapp) files. Secure the file by having it readable only by the production ID that runs the StreamBase application.
Service Location File The name of the Activ service location file, typically named ServiceLocation.xml.
Service Type The type of service being connect to: ContentGateway or WorkstationService.
User-Defined Universal Field List File The name of an optional universal field list file containing user-defined fields. If present, this file must have a top-level XML element named ActivUniversalFieldList and one more child XML elements named Field. Each Field element must have attributes named id, name, codeIdentifier, type, size, and description. The Activ adapter sample provides an example universal field list file, UserDefinedUniversalFieldList.xml.
Set undefined numeric values to zero When the value of a field in an incoming update message changes from defined to undefined, by default the adapter sets that field's value in the resulting tuple to null. As unchanged fields are also set to null, select this option to set undefined field values to 0, enabling applications to differentiate between undefined fields and unchanged fields.
Reconnect Interval The time, in seconds, to wait between attempts to reconnect to the ACG after a failed connection attempt or after the connection to ACG is lost (a disconnect). A minimum for this field is 1 second; a value near the default should be used to prevent hyperactively attempted reconnections.
Reconnection Attempts The number of times to attempt to connect to ACG at the Reconnect Interval if an attempt fails. Setting this to <= 0 will cause the adapter to continuously attempt a connection, at the Reconnect Interval rate. The default of 0 is recommended, so as to continuously retry to connect to ACG whenever the StreamBase application is running and ACG is not connected.
Worker Thread Count The number of consumer threads to process incoming ACG event messages. Setting this to 0 causes message processing to take place in the same thread as the one receiving incoming ACG event messages, which could cause delays and loss of data that would eventually have the adapter auto-disconnect from the ACG. Instead, set this to >=1, based on expected message rate. Setting to the default of 2 is a good starting point for most applications.
Truncate Fields Silently For string typed output schema fields. If the Activ field string type length is greater than the StreamBase field, the field will be silently truncated as it is copied into the StreamBase output tuple. Otherwise an error message will result and the resulting tuple field will be null.
Cache field values If this property is enabled or checked, previous values of events for the symbol will be stored and updated before sending a tuple to the output port. When this option is enabled, more memory and processing overhead is incurred. This is a useful feature to save on the complexity of implementing a cache with StreamBase tables and queries in separate elements in an application.
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Using the Adapter within a StreamBase Application

This section discusses how to use the ActivFeed adapter within a StreamBase application. As shown in the diagram below depicting the adapter's sample application, the adapter has one input port and two output ports to communicate with the surrounding application.

The ActivFeed adapter's ports are used as follows:

  • subscribe: Tuples enqueued on this port cause the adapter to subscribe to, or unsubscribe from, symbols after the adapter has started. Initial subscriptions (those the adapter processes during startup) can later be cancelled using this port. The schema of the SubscribeIn port is derived from the upstream operator or stream and must have the following fields. Note that the first field is always required. If the Symbol field is require it is noted with the command description. The Relationships and Tables fields are optional and can be null.

    • command, string: one of the following:

      • subeq to subscribe a single symbol (example: "MSFT.Q" for Microsoft, NASDAQ exchange). Field Symbol is required (non-null).

      • submatch to subscribe to a symbol pattern (example: "EBAY.*" for EBAY on any exchange, or "*.Q" for all NASDAQ symbols).

      • unsub to unsubscribe a single symbol. Field Symbol is required (non-null).

      • clearsub to unsubscribe from all previous subscriptions.

      • listsub to list subscriptions on status port.

      • listsym to list all symbols and all received related symbols (such as all options in option chain) on the status port (only if "Cache field values" is enabled; otherwise, this entry does nothing).

      • listmatch to list all available Activ symbols matching the given string (using AcitvContentGateway's getSymbols functionality). One tuple will be issued on the status port for each matching symbol found, then a marker tuple (with endOfSymbolList in the Command field) to indicate the end of the list.

      • "emitall" to emit all saved tuples on the market data port (a complete data refresh, only if "Cache field values" is enabled; otherwise, this entry does nothing).

      • listrels lists the Activ Relationships that can be used in the Relationship field (described below) for a subeq command.

      • listtables will list the Activ tables that can be used in the tables field (see below).

    • symbol, string: specifies the symbol to subscribe to or unsubscribe from. If null or empty, all symbols on the specified exchange are included. This field is required for commands subeq and submatch (symbol value must be a string; cannot be null).

    • relationships, string: specifies the relationship type, an example being OPTION option-chain subscription, or "NONE" for an exact subscription to only the security specified in symbol. One can create more than one relationship in a subscription, such as "NONE,OPTION" for the symbol and its options. See the Activ documentation for a complete list of relationships, or use the listrels command to obtain a list. Generally, "NONE" is a good choice for exact symbol matches. Use the command listrels to get a list of all relationships that can be used in this field for subeq commands. If this field is null it will imply "NONE".

    • tables, string; specifies the tables to search, or put "all" for subscribing to all tables. Examples of table names are "US_LISTING", "US_EQUITY_OPTION ". See the Activ documentation for a complete list of tables, or use the listtables command to obtain a list. Generally, using "all" for exact symbols is the correct choice. If this field is null it will imply "all".

  • MarketDataOutputStream: Market data update tuples are emitted from this port. Its schema is derived from the Edit Schema tab. Some information about field name mapping:

    • Schema field names should be the same as ACG field names, an example being "BID". See the example schema provided for other field names. Consult your current ACG documentation for all field names that can be specified.

    • The following special field names are reserved by StreamBase to capture additional information from the Activ event messages:

      • RELATIONSHIP_ID: the numeric relationship ID from the ACG event message. Examples are 108 for RELATIONSHIP_ID_OPTION_ROOT and 0 for RELATIONSHIP_ID_NONE.

      • RELATIONSHIP_NAME: the relationship name from the ACG event message, such as "RELATIONSHIP_ID_OPTION_ROOT" and "RELATIONSHIP_ID_NONE"

      • ROOTSYMBOL: for relationships like "RELATIONSHIP_ID_OPTION_ROOT", this field contains the root symbol, as specified in the subscription.

      • UPDATE_EVENT_TYPE: receives the numeric value of the event type of update messages.

      • UPDATE_EVENT_TYPE_NAME: receives the name of the event type of update messages.

    • For Activ "TRational" data types (complex Tick fields), additional schema fields can be defined with suffixes for related information on these fields (..._TICK, ..._TRENDDAY, ..._TRENDONPREVIOUS). For example, in the field "BID", the application schema fields BID_TICK, BID_TRENDDAY and BID_TRENDONPREVIOUS can be defined to receive this BID-related data. These schema fields can be defined as either integer or string, as defined by the ACG-API. They are mapped to the integer or string values as defined by ACG. For comparison operations in an application, an integer field would have better performance.

  • AdapterStatusMessages: This output port emits status, information, and error tuples. Note that errors will be limited to a small number per minute to prevent a message flood. The InfoOutput port has the following schema:

    • Info/Error, bool: Value is false for error output, true for information output.

    • Command, string: type of output, categorized or the echo of the input command, such as "subeq" for a subscription or "connection" for connection-oriented information.

    • Tag, string: a sub-category of output, such as "connected" or "disconnected" for Type "connection", or "exception" if an internal exception is raised.

    • Info1-Info5, string: returns one or more information-specific values. An example for "exception" might be "null pointer exception".

Add an instance of the adapter to a new StreamBase application as follows:

  1. Within StreamBase Studio, create a project, including a StreamBase EventFlow diagram, that will host the adapter.

  2. Import the schema mapping examples, which are located in streambase-install-dir/sample/adapter/embedded/activfeed. These can be used to quickly set up an initial, valid schema that can be edited.

  3. Drag an instance of the ActivFeed adapter from the Global Adapters drawer in the Palette view to the canvas.

  4. Connect Input and Output streams to the adapter's one input and two output ports. Configure the schema of the input stream as defined in the descriptions of the SubscribeIn and InfoQueryOut ports above. The adapter will inform you of any missing or extraneous fields in the input port and any type-conflicts in output data ports.

  5. Select or double-click the adapter icon, and in the Properties view, click Adapter Settings. Fill in the UserID (or file pathname), and Password (or password file name). The adapter's sample uses this latter technique — be sure to adjust the pathname and create such files to supply your information as provided by Activ. Fill in the pathname to the Activ ServiceLocation.xml file (provided by Activ). Adjust other properties as required by your application.

  6. Click Edit Schema and add the Activ fields you would like to have appear in the market data output stream. You can use the copy schema feature to retrieve an imported Activ schema, and edit to change types or delete or add other Activ fields as needed.

Typechecking and Error Handling

The ActivFeed adapter uses typecheck messages to help you configure the adapter within your StreamBase application. In particular, the adapter generates typecheck messages if the ACG username, password or ServiceLocation are missing, or when one or more required fields in the input schema is missing or is of the wrong type or size, or if the output schema type will not properly convey information (such as a floating point into integer error). Any market data output field can be defined as string.

The adapter generates warning and error messages during runtime under various conditions, including:

  • A field in the market data output port's schema is not recognized as a valid Activ field name.

  • A tuple received on the subscription port contains incorrect parameters that return an error from ACG.

  • A tuple received on the input port contains a null value.

  • A field from a market data message is truncated to fit in an output tuple's string field. To avoid overrunning log files, truncation warnings are generated just once per unit of time, and then this error type is re-enabled.

  • Other ACG or application-related internal messages such as thrown Exceptions.

Suspend/Resume Behavior

When suspended, the ActivFeed adapter also calls ACG to suspend its messages (this is an ACG provision that will shut down update messages). Any ACG messages received but not yet processed by StreamBase threads will be ignored and discarded.

When resumed, the adapter invokes the ACG resume facility and re-enables the StreamBase input adapter threads to again process event messages.