TIBCO Rendezvous Subscribing Input Adapter

Introduction

The TIBCO StreamBase® Input Adapter for TIBCO Rendezvous® allows a StreamBase application to receive messages published to one or more Rendezvous subjects. The adapter is embedded in the StreamBase application and has two output ports: one that emits status tuples, and a second that emits tuples representing received Tibrv messages.

The adapter subscribes to one or more subjects during startup and asynchronously and continuously receives and converts incoming Tibrv messages into StreamBase tuples, which it sends downstream.

The adapter supports all StreamBase data types, including tuples and lists, though lists may not contain elements of type list. The schema of the Tibrv message output port is set through the Edit Schema tab of the adapter's Properties view.

The adapter is configured through several properties set in the adapter's Properties view in StreamBase Studio. The properties include parameters used to connect to the Rendezvous daemon and the set of subjects to subscribe to at startup.

TIBCO Middleware Dependencies

This adapter requires access to the JAR file that implements the TIBCO Rendezvous Java API on your system, and any files referenced by that JAR file. The StreamBase installation kit includes version 8.1.2 of the tibrvj.jar JAR file. If your site's Rendezvous implementation requires a newer version of that file, copy the file from $TIBRV_HOME/lib to $STREAMBASE_HOME/lib/ext.

The tibrvj.jar file, in turn, relies on the tibrvj.dll and tibrv.dll files for Windows, and for UNIX, on several .so files in /usr/tibco/tibrv/lib. These DLL or .so files are supplied as part of your TIBCO installation, and are not shipped with StreamBase. Make sure these files are locatable by the TIBCO JAR file on the PATH or by means of the TIBRV_HOME environment variable.

If you get an error message whose text refers to Library not found: tibrvj, audit your system for multiple versions of the TIBCO JAR and DLL (or .so) files. For example, on Windows, you might have an older TIBCO release's DLL files in C:\Windows\System32, and a newer set in $TIBRV_HOME\lib. Make sure the StreamBase TIBCO adapter sees a consistent TIBCO implementation, including tibrvj.jar, tibrvj.dll, and tibrv.dll files all from the same TIBCO release.

The TIBCO API implementation described in this section is a product of a third party, and its specifications and file names are subject to change by TIBCO. See your TIBCO documentation for the latest information.

Adapter Properties

Property Description
Service The Rendezvous service name. Leave blank to use the default value of 7500.
Network The network interface. Leave blank to use the primary network interface for the host computer.
Daemon The Rendezvous daemon. Leave blank to use the daemon on the local computer listening on the default port.
Use Unique Queue If enabled, the adapter uses a per-adapter-instance event queue rather than the default Tibrv event queue. Using a per-instance queue can lower latency in some situations.
Subject(s) One or more subjects to subscribe to at startup. Separate multiple subjects with spaces. For example, "top.subject1 top.subject2". Use an asterisk wildcard to subscribe to all subjects at a given level. For example, "top.*".
Subject Field Name The name of the output stream field to receive the subject value from the published Tibrv message.
Ignore Advisory Messages If enabled, the adapter discards messages with subjects that start with _RV.
Raw Message Field Name The name of the output stream string or blob field to receive the raw Tibrv message.
Find Fields By ID If enabled, the adapter first tries to map a Tibrv message field to a StreamBase field by field ID. A Tibrv field with an ID of 123 would match a StreamBase field named _123. If no ID match is found, the adapter uses its default field mapping mechanism, using the Tibrv field name to find the StreamBase field.
Enable Dynamic Subscriptions Port If enabled, the adapter includes an input port to which tuples are enqueued to subscribe to, or unsubscribe from, subjects after the adapter has started.
Request Cached Values If enabled, the adapter requests the cached message for each subscribed subject and includes an additional boolean field, _IsCachedValue, on its second output port, which contains true if the tuple represents a cached message and false otherwise. Cache requests are processed in the background, and the adapter may emit one or more tuples containing non-cached messages before emitting a tuple with a cached message for a given subject. The StreamBase application must be prepared to discard or merge stale cached messages. When a cache request for a given subject times out, the adapter emits a no cached message available status tuple.
Cache Request Timeout Specifies the time, in milliseconds, to wait for a response from the cache server. The total time required to request the cached messages for a burst of subscriptions increases linearly with the Cache Request Timeout (assuming a significant number of subjects are not cached).
Cache Request Max Thread Count Specifies the maximum number of threads to use in servicing cache requests. These threads are created on demand and remain active until there are no cache requests to service, at which point they exit. The total time required to request the cached messages for a burst of subscriptions decreases linearly with the Max Thread Count (assuming a significant number of subjects are not cached).
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Using the Adapter in a StreamBase Application

This section discusses how to use the TIBCO Rendezvous Subscribing Input Adapter in a StreamBase application. As shown in the lower left corner of the diagram below (depicting one of the adapter's sample applications), the subscribing adapter has no input ports and two output ports to communicate with the surrounding application.

The TIBCO Rendezvous Subscribing Input Adapter's ports are used as follows:

  • DynamicSubscribe: If enabled, this input port receives tuples used to subscribe to, or unsubscribe from, subjects after the adapter has started. The DynamicSubscribe port has the following schema:

    • Subject, string: Contains the subject to subscribe to or unsubscribe from.

    • Subscribe, bool: true or false to subscribe or unsubscribe, respectively. A null value defaults to true (subscribe).

  • Status: This output port emits status, information, and error tuples. The Status port has the following schema:

    • type, string: Contains one of the following values describing the type of event that occurred:

      • Transport

      • Process Tibrv Message

      • Suspend/Resume

    • object, string: the name of the object associated with the event, such as the Rendezvous transport parameters (service, network, and daemon), the contents of the Tibrv message that wasn't processed successfully, or the name of the adapter that was suspended or resumed.

    • action, string: the action associated with the event, such as transport created or adapter suspended/resumed.

    • message, string: A human-readable message string

  • Tibrv Messages: The adapter emits a tuple on its second output port for each Tibrv message received. The port's schema is derived from the Edit Schema tab of the adapter's property view and can contain all StreamBase data types, including lists and (nested) tuples to accommodate hierarchy in Tibrv messages. Tibrv message fields are mapped to tuple fields by case-sensitive name. List fields may contain all StreamBase types except list types. StreamBase list fields are used to capture Tibrv array fields (such as I8ARRAY) in the Tibrv message, as well as non-array Tibrv fields that occur multiple times with the same name. Fields not present in a Tibrv message are sent as null in the tuple.

    Note

    When configured to request cached values, the adapter includes an additional boolean field, _IsCachedValue, on its second output port, which contains true if the tuple represents a cached message and false otherwise.

    In processing incoming Tibrv messages, the adapter converts Tibrv data types to StreamBase data types. Not all possible conversions are valid. When an attempt to perform an invalid conversion occurs, a one-time warning for the field is displayed, a status tuple is emitted, and the corresponding field in the Tibrv tuple is set to null. The following table shows the set of valid conversions.

    Tibrv Type Valid StreamBase Types
    BOOL bool, list<bool>
    DATETIME timestamp, list<timestamp>
    ENCRYPTED  
    F32 double, string, list<double>, list<string>
    F32ARRAY list<double>, list<string>
    F64 double, string, list<double>, list<string>
    F64ARRAY list<double>, list<string>
    I8 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I8ARRAY blob, list<bool>, list<int>, list<long>, list<string>
    I16 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I16ARRAY list<bool>, list<int>, list<long>, list<string>
    I32 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I32ARRAY list<bool>, list<int>, list<long>, list<string>
    I64 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I64ARRAY list<bool>, list<int>, list<long>, list<string>
    IPPORT16
    IPADDR32
    MSG tuple, list<tuple>
    MSGARRAY list<tuple>
    OPAQUE blob, string, list<bool>
    STRING blob, string, list<blob>, list<string>
    STRINGARRAY list<string>
    U8 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U8ARRAY blob, list<bool>, list<int>, list<long>, list<string>
    U16 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U16ARRAY list<bool>, list<int>, list<long>, list<string>
    U32 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U32ARRAY list<bool>, list<int>, list<long>, list<string>
    U64 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U64ARRAY list<bool>, list<int>, list<long>, list<string>
    XML

Add an instance of the adapter to a new StreamBase application as follows:

  1. In StreamBase Studio, create a project, and create an EventFlow application file to host the adapter.

  2. Drag an instance of the TIBCO Rendezvous Subscribing Input Adapter from the Operators and Adapter drawer in the Palette view to the canvas.

  3. Connect Output streams to the adapter's two output ports.

  4. Configure the schema of the Tibrv message output stream (port 2) using the Edit Schema tab of the adapter's property view to match the set of fields expected in the incoming Tibrv messages.

  5. Select the adapter icon, and in the Properties view, select Adapter Settings and fill in any desired properties. Subject(s) is the only required property.

Typechecking and Error Handling

The TIBCO Rendezvous Subscribing Input adapter uses typecheck messages to help you configure the adapter in your StreamBase application. In particular, the adapter generates typecheck messages when no subjects are provided or when a list field in the Tibrv message output port uses an unsupported element type.

The adapter generates messages on the status port during runtime under various conditions, including:

  • The adapter creates or fails to create a transport to the Rendezvous daemon during startup.

  • A data type mismatch is detected between a field in an incoming Tibrv message and the corresponding tuple field.

  • The adapter is suspended or resumed.

Suspend and Resume Behavior

When suspended, the TIBCO Rendezvous Subscribing Input Adapter stops emitting tuples on its output ports.

When resumed, the adapter begins emitting tuples on its output ports once again.