Bidirectional Socket Writer Adapter

Introduction

The TIBCO StreamBase® Bidirectional Socket Writer Adapter was created to allow for two-way socket communication using a pair of adapters, one for reading, and one for writing. The pair of adapters work together by sharing the connections that are made. As the name implies, the writer allows for tuples to be converted and written out to one or more sockets.

This adapter allows you to create your own data transformations, or use pre-built ones, before sending the data on the wire. There are currently four pre-built data transformations available: CSV, JSON, BLOB, and serialized tuple. The data transformers convert each incoming tuple into a byte array and send that data to one or more sockets.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Description
Share connections with a Socket Reader (check box) Specifies whether this adapter is to share connections with another adapter, if shared socket connections can be used for both reading and writing.
Socket Reader to share connections with (string) The name of the socket reader that this adapter is to share socket connections with.
Share data transformer (check box) If enabled, the writer uses the same data transformer as the reader. This allows you to maintain state in the transformer between read and write operations.
Port (int) The port to use. When in server mode, this is the port to listen on. When in client mode, this is the default port to use when making new connections.
Server Mode (check box) This option determines whether this adapter runs as a client (check box cleared) or as a server (check box selected).
Enable Control Port (check box) This option determines whether this control port is available. The control port allows for commands to be sent to the adapter to start or stop in server mode, or to connect or disconnect in client mode.
Enable Status (check box) This option enables a status port with status information about the state of the adapter during run time. Some common status tuples include connection and disconnection information.
Socket Identifier Field Name (string) This required field determines which field of the incoming data determines which connection the data will be sent to. The field can be left blank, in which case all incoming tuples are sent to all connections. This field is removed from the outgoing tuple before being processed to send. To find a connection in the incoming data, the expected format is host:port:socketidentifier; or if no socketidentifier has been assigned, then just host:port.
Close Connection After Write (check box) If enabled the server will close the client connection after it writes a response back to the client.
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Server Tab

Property Description
Max Connection Count (int) The maximum number of incoming connections that this adapter allows at any given time. If a client tries to connect when the maximum number of connections is already reached, the connection is rejected and a status tuple is emitted.
Connection Backlog (int) The backlog parameter is the maximum number of pending connections on the socket. Its exact semantics are implementation specific. In particular, an implementation may impose a maximum length or may choose to ignore the parameter altogether. If the backlog parameter has the value 0, or a negative value, then an implementation specific default is used.
Start Server At Startup (check box) Determines whether the adapter starts the server when the application starts. If disabled, the end-user must enable the control port and emit a command tuple to start and stop the server.

Client Tab

Property Description
Default Host Name (string) The default host name to connect to. This value is used when the Connect at startup value is selected or when a command tuple is received that has null for the host value.
Connect at startup (check box) Determines whether the adapter connects to the default host and port when the adapter starts.
Reconnect Count (int) Determines how many times this adapter will retry to connect to a host.
Timeout Period (int) The number of seconds to wait to try to connect after failing to connect.

Data Transformer Tab

Property Description
Data Transformer Class (string) The fully qualified path to the data transformer class that will be used to convert data from bytes into a tuple and from tuples to bytes.
Data Transformer Settings (key value pair of strings) These settings are supplied to the data transformer. The settings are a key value pair of strings.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Control Port

Description

Use the control port to send action commands to the adapter. The adapter can be run either in server mode (where it listens for connections) or client mode (where it makes outbound connections), and each mode has its own set of commands. Tuples enqueued on this port cause the adapter to start or stop a server, or to connect or disconnect clients.

Control Port Schema

  • Command, string, the command to send to the adapter.

    If the adapter is in server mode, the values are:

    • START -- This tells the adapter to start the server and listen on the specified or default host and port. Any new connection established has a host:port key value which outputs on each data tuple.

    • STOP -- This tells the adapter to stop the server and close all client connections.

    • DISCONNECT - This tells the adapter to disconnect a client from the server. The socketidentifer must be included and match a current connection.

    If the adapter is in client mode, the values are:

    • CONNECT -- This tells the adapter to make a new connection to the specified or default host and port. The associated values host:port:socketidentifer are used to identify this connection for further actions; those values are output as the socket identifier on all emitted data tuples.

    • DISCONNECT - This tells the adapter to disconnect from the specified or default host and port. The host:port:socketidentifer combination must match the values used to initially make the connection.

  • Host, string. The host to connect to. This value is only valid when sending commands to adapters in client mode. If the value is null, the default value of the adapter is used.

  • Port, int. The port to use when making connections. If the value is null the default value of the adapter is used. If the adapter is in server mode, the port value determines which port to listen on. If the adapter is in client mode, the port is used for outgoing connections.

  • SocketIdentifier (Optional), string. Only used when the adapter is in client mode. If specified, this value is added to the socket identifier value emitted on the status port as the object field with the format Host:Port:SocketIdentifier.

Status Port

Description

The status port is used to send status information tuples downstream to inform the user of changes.

Status Port Schema

  • type, string. The type of status information emitted on this port. Status types are:

    • Connection -- Indicates this message is about a connection.

    • Server -- Indicates this message is about the listening server.

  • action, string.

    • Connected -- This type of action occurs when a new connection is made either as a client or server. The socket identifier will be outputted as the object of this message. For a server the socket identifier will be host:port and for clients it will be 'host:port:(optional) inputted socket identifier'

    • Disconnected -- This type of action occurs when a connection disconnects either as a client or server.

    • Failed -- A failure occurred and the failure message will be included.

    • Limit -- Used when in server mode, this message will be emitted when the max number of connections to the server has been reached.

    • Stopped -- Used when in server mode, sent when the server has been stopped.

    • Started -- Used when in server mode, sent when the server has been started.

  • object, string. This value may be null. If it is not null, it contains a value relevant to the status message. On new connections, this value contains the socket identifier of the new connection.

  • message, string. This is a formatted human readable message that explains the status message.

  • inputTuple. Not used for this adapter.

Data Transformers

Data transformers are used to convert data in and out of the system.

Built-In Data Transformers

CSV Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer

The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
UseDefaultCharset Reader/Writer true, false true If specified, specifies whether the Java platform default character set is to be used.
Charset Reader/Writer string System Charset The name of the character set encoding that the adapter is to use to read input or write output.
CaptureTransformStrategy Reader/Writer Flatten, Nest Flatten The strategy to use when transforming capture fields for this operator.
FieldDelimiter Reader/Writer Single character , The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
NullValueRepresentation Writer string null String to write when a field is null.
StringQuote Writer QuoteIfNecessary, AlwaysQuote, NeverQuote QuoteIfNecessary Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote.
StringQuoteCharacter Reader/Writer Single character " Matching pairs of the quote character used to delimit string constants.
TimestampFormat Reader String yyyy-MM-dd hh:mm:ss.SSSZ The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.

If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message appears on the console that includes the text invalid timestamp value.

LenientTimestampParsing Reader true, false true Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats.
IncompleteRecords Reader Discard, PopulateWithNulls Discard

Specifies what should be done when the adapter receives a record with less than the required number of fields.

Discard

Discard records with less than the required number of fields.

PopulateWithNulls

When records with less than the required number of fields are encountered, process the records after populating the unspecified fields with nulls.

DiscardEmptyRecords Reader true, false true

This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines.

LogWarning Reader true, false false

Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields.

BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

JSON Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer

The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
IncludeNullValues Writer true, false true Include fields that contain null values.
EncodeSubType Writer list, map list The type of transformation that should be used when there are sub tuples to process.
Separator Reader/Writer Single character \n The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03.
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

BLOB Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer

The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
BlobFieldName Reader/Writer string BlobField For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Serialized Tuple Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer

The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Building Custom Data Transformers

Introduction

Custom data transformers allow the end user to build Java code to handle the input and output of the Bi-Directional Adapters. In this section we will cover how the end user can build their own data transformers. There is a single interface that needs to be implemented in order for a class to work with the adapters, that interface is ISocketDataTransformer which we will cover in the next section. Each socket connection established by the adapter creates a new copy of the given data transformer which allows for data to be stored between socket reads and writes for an individual socket connection.

The Bi-Direction socket adapter samples includes two files CustomDataTransformer.java and CustomDataTransformer.sbapp that together demonstrate how to build your own data transformations and use them in an application.

Setup

To use the com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface you must make sure that the file streambase/lib/ext/bi-directional-socket-interface.jar is present on your build path.

Now make sure this class is available to the adapter at runtime. The most expedient way of accomplishing this is to put your class in a JAR file and place this JAR file in StreamBase's lib/adapter/ext directory.

java.nio.ByteBuffer

The interface make use of java.nio.ByteBuffer and assumes that the end user of this interface has some familiarity with this class.

Most of the complexities of java.nio.ByteBuffer are handled by the adapter but the end user should know how to deal with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.

Example:

Five characters are read into a string during toTuple:

String testData = new String(readBuffer.array(), readBuffer.position(), 5);

then the buffers position should be updates to reflect the new position:

buffer.position(buffer.position() + 5);

NOTE: Some methods of the ByteBuffer update the position for you like buffer.getInt(); Please read the documentation for java.nio.ByteBuffer for further details.

com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer

The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface is required to be implemented for your data transformer to work with the adapter. The interface has the following methods that need to be completed:

  • public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);

    Description: This method is called when a socket connection is made and the data transformer needs to be initialized.

    Variables

    • Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.

    • Logger logger - The adapter's logger to allow your transformer to log out information.

    • String host - The host of the socket connection.

    • int port - The port of the socket connection.

    • String socketIdentifier - The optional identifier that may be supplied by the client.

    • boolean isReader - A flag to let the developer know if this init call is for the reader or writer adapter.

    Return Value: void

  • public Map<String, String> getDefaultSettings();

    Description: This method is called by the adapter to try and verify information about the adapter, in future versions of StreamBase it will also be used to try and pre-populate the settings list when a data transformer is selected.

    Return Value: Map<String, String> The default settings of the adapter.

  • public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);

    Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.

    Variables

    • Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.

    • Schema schema - This is the schema that will be used to create tuples for outgoing data.

    • boolean isReader - A flag to let the developer know if this validation call is for the reader or writer adapter.

    Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.

  • public Tuple toTuple(ByteBuffer readBuffer, Schema schema);

    Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer will be updated after reading data. If the position value is updated and a null values is returned the position will be reset back to what it was before entering this method. This method will be called repeatedly until a null value is returned or the buffer has zero bytes remaining.

    Variables

    • ByteBuffer readBuffer - The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.

    • Schema schema - This is the schema to be used to create a tuple and fill its values from the incoming data.

    Return Value: A valid tuple if data is available or null if a tuple could not be created.

  • public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);

    Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple will be transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it, if no space is available and you return false the tuple will be discarded.

    Variables

    • Tuple tuple - The current tuple that needs to be transformed and added to the writeBuffer.

    • ByteBuffer writeBuffer - This is the data that will be directly sent on the socket. The size of this buffer is set by the getBufferSize() method.

    Return Value:

    True if this tuple was successfully added to the write buffer.

    False if a problem occurred or no space is available. If false is returned the adapter will attempt to write data out to the socket and free space and call this method again one more time with the same tuple before discarding the tuple.

  • public int getBufferSize();

    Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.

    Return Value: An integer value which specifies the size.

  • public String getName();

    Description: This method is called to get the name of the data transformer. (This method is not currently used and is for future use)

    Return Value: A string representing the name of the data transformer.