The TIBCO StreamBase® Bi-Directional Socket Reader Adapter was created to allow for two-way socket communication using a pair of adapters, one for reading, and one for writing. The pair of adapters work together by sharing the connections that are made. As the name implies, the reader allows for data to be read in from one or more sockets and then converted and sent downstream as a tuple.
This adapter allows you to create your own data transformations, or use pre-built ones to transform the data on the wire into a tuple. There are currently four pre-built data transformations available: CSV, JSON, BLOB, and serialized tuple. The data transformers convert bytes read from the socket into a tuple of the provided schema.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
Property | Description |
---|---|
Port (int) | The port to use. When in server mode, this is the port to listen on. When in client mode, this is the default port to use when making new connections. |
Server Mode (check box) | This option determines whether this adapter runs as a client (check box cleared) or as a server (check box selected). |
Enable Control Port (check box) | This option determines whether this control port is available. The control port allows for commands to be sent to the adapter to start or stop in server mode, or to connect or disconnect in client mode. |
Enable Status (check box) | This option enables a status port with status information about the state of the adapter during run time. Some common status tuples include connection and disconnection information. |
Socket Identifier Field Name (string) |
This required field determines the name of the field added to all outgoing
data tuples. Each tuple emitted on the data output port will contain this
field with a socket identifier for
the connection in the format host:port:socketidentifier , or if no socket identifier is
specified for the connection, then just host:port .
|
Log Level | Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. |
Property | Description |
---|---|
Max Connection Count (int) | The maximum number of incoming connections that this adapter allows at any given time. If a client tries to connect when the maximum number of connections is already reached, the connection is rejected and a status tuple is emitted. |
Start Server At Startup (check box) | Determines whether the adapter starts the server when the application starts. If disabled, the end-user must enable the control port and emit a command tuple to start and stop the server. |
Property | Description |
---|---|
Default Host Name (string) | The default host name to connect to. This value is used when the Connect at startup value is selected or when a command tuple is received that has null for the host value. |
Connect at startup (check box) | Determines whether the adapter connects to the default host and port when the adapter starts. |
Reconnect Count (int) | Determines how many times this adapter will retry to connect to a host. |
Timeout Period (int) | The number of seconds to wait to try to connect after failing to connect. |
Property | Description |
---|---|
Data Transformer Class (string) | The fully qualified path to the data transformer class that will be used to convert data from bytes into a tuple and from tuples to bytes. |
Data Transformer Settings (key value pair of strings) | These settings are supplied to the data transformer. The settings are a key value pair of strings. |
This schema is used to define how the data is translated from byte information into a tuple.
-
Use the drop-down list to select the name of a named schema previously defined in or imported into this module. The drop-down list is empty unless you have defined or imported at least one named schema for the current module.
When you select a named schema, its fields are loaded into the schema grid, overriding any schema fields already present. Once you import a named schema, the schema grid is dimmed and can no longer be edited. To restore the ability to edit the schema grid, re-select
Private Schema
from the drop-down list.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
Use the control port to send action commands to the adapter. The adapter can be run either in server mode (where it listens for connections) or client mode (where it makes outbound connections), and each mode has its own set of commands. Tuples enqueued on this port cause the adapter to start or stop a server, or to connect or disconnect clients.
-
Command, string, the command to send to the adapter.
If the adapter is in server mode, the values are:
-
START -- This tells the adapter to start the server and listen on the specified or default host and port. Any new connection established has a
host:port
key value which outputs on each data tuple. -
STOP -- This tells the adapter to stop the server and close all client connections.
-
DISCONNECT - This tells the adapter to disconnect a client from the server. The
socketidentifer
must be included and match a current connection.
If the adapter is in client mode, the values are:
-
CONNECT -- This tells the adapter to make a new connection to the specified or default host and port. The associated values
host:port:socketidentifer
are used to identify this connection for further actions; those values are output as the socket identifier on all emitted data tuples. -
DISCONNECT - This tells the adapter to disconnect from the specified or default host and port. The
host:port:socketidentifer
combination must match the values used to initially make the connection.
-
-
Host, string. The host to connect to. This value is only valid when sending commands to adapters in client mode. If the value is null, the default value of the adapter is used.
-
Port, int. The port to use when making connections. If the value is null the default value of the adapter is used. If the adapter is in server mode, the port value determines which port to listen on. If the adapter is in client mode, the port is used for outgoing connections.
-
SocketIdentifier (Optional), string. Only used when the adapter is in client mode. If specified, this value is added to the socket identifier value emitted on the status port as the object field with the format
Host:Port:SocketIdentifier
.
The status port is used to send status information tuples downstream to inform the user of changes.
-
type, string. The type of status information emitted on this port. Status types are:
-
Connection -- Indicates this message is about a connection.
-
Server -- Indicates this message is about the listening server.
-
-
action, string.
-
Connected -- This type of action occurs when a new connection is made either as a client or server. The socket identifier will be outputted as the object of this message. For a server the socket identifier will be host:port and for clients it will be 'host:port:(optional) inputted socket identifier'
-
Disconnected -- This type of action occurs when a connection disconnects either as a client or server.
-
Failed -- A failure occurred and the failure message will be included.
-
Limit -- Used when in server mode, this message will be emitted when the max number of connections to the server has been reached.
-
Stopped -- Used when in server mode, sent when the server has been stopped.
-
Started -- Used when in server mode, sent when the server has been started.
-
-
object, string. This value may be null. If it is not null, it contains a value relevant to the status message. On new connections, this value contains the socket identifier of the new connection.
-
message, string. This is a formatted human readable message that explains the status message.
-
inputTuple. Not used for this adapter.
Data transformers are used to convert data in and out of the system.
com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer
The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
UseDefaultCharset | Reader/Writer | true, false | true | If specified, specifies whether the Java platform default character set is to be used. |
Charset | Reader/Writer | string | System Charset | The name of the character set encoding that the adapter is to use to read input or write output. |
CaptureTransformStrategy | Reader/Writer | Flatten, Nest | Flatten | The strategy to use when transforming capture fields for this operator. |
FieldDelimiter | Reader/Writer | Single character | , |
The delimiter used to separate tokens, defaults to a comma. Control
characters can be entered as &#ddd; where ddd is the character's ASCII value.
|
NullValueRepresentation | Writer | string | null | String to write when a field is null. |
StringQuote | Writer | QuoteIfNecessary, AlwaysQuote, NeverQuote | QuoteIfNecessary | Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote. |
StringQuoteCharacter | Reader/Writer | Single character | " | Matching pairs of the quote character used to delimit string constants. |
TimestampFormat | Reader | String | yyyy-MM-dd hh:mm:ss.SSSZ |
The format used to parse timestamp fields extracted from the input
tuples. This should be in the form expected by java.text.SimpleDateFormat class described in
Java Platform SE reference documentation.
If a timestamp value is read that does not match the specified format
string, the entire record is discarded and a WARN message appears on
the console that includes the text |
LenientTimestampParsing | Reader | true, false | true | Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats. |
IncompleteRecords | Reader | Discard, PopulateWithNulls | Discard |
Specifies what should be done when the adapter receives a record with less than the required number of fields.
|
DiscardEmptyRecords | Reader | true, false | true |
This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines. |
LogWarning | Reader | true, false | false |
Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer
The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
IncludeNullValues | Writer | true, false | true | Include fields that contain null values. |
EncodeSubType | Writer | list, map | list | The type of transformation that should be used when there are sub tuples to process. |
Separator | Reader/Writer | Single character | \n | The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer
The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
BlobFieldName | Reader/Writer | string | BlobField | For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer
The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
Custom data transformers allow the end user to build Java code to handle the input and output of the Bi-Directional Adapters. In this section we will cover how the end user can build their own data transformers. There is a single interface that needs to be implemented in order for a class to work with the adapters, that interface is ISocketDataTransformer which we will cover in the next section. Each socket connection established by the adapter creates a new copy of the given data transformer which allows for data to be stored between socket reads and writes for an individual socket connection.
The Bi-Direction socket adapter samples includes two files CustomDataTransformer.java and CustomDataTransformer.sbapp that together demonstrate how to build your own data transformations and use them in an application.
To use the com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface you must make sure that the file streambase/lib/ext/bi-directional-socket-interface.jar is present on your build path.
Now make sure this class is available to the adapter at runtime. The most expedient way of accomplishing this is to put your class in a JAR file and place this JAR file in StreamBase's lib/adapter/ext directory.
The interface make use of java.nio.ByteBuffer and assumes that the end user of this interface has some familiarity with this class.
Most of the complexities of java.nio.ByteBuffer are handled by the adapter but the end user should know how to deal with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.
Example:
Five characters are read into a string during toTuple:
String testData = new String(readBuffer.array(), readBuffer.position(), 5);
then the buffers position should be updates to reflect the new position:
buffer.position(buffer.position() + 5);
NOTE: Some methods of the ByteBuffer update the position for you like buffer.getInt(); Please read the documentation for java.nio.ByteBuffer for further details.
The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface is required to be implemented for your data transformer to work with the adapter. The interface has the following methods that need to be completed:
-
public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);
Description: This method is called when a socket connection is made and the data transformer needs to be initialized.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Logger logger - The adapter's logger to allow your transformer to log out information.
-
String host - The host of the socket connection.
-
int port - The port of the socket connection.
-
String socketIdentifier - The optional identifier that may be supplied by the client.
-
boolean isReader - A flag to let the developer know if this init call is for the reader or writer adapter.
Return Value: void
-
-
public Map<String, String> getDefaultSettings();
Description: This method is called by the adapter to try and verify information about the adapter, in future versions of StreamBase it will also be used to try and pre-populate the settings list when a data transformer is selected.
Return Value: Map<String, String> The default settings of the adapter.
-
public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);
Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Schema schema - This is the schema that will be used to create tuples for outgoing data.
-
boolean isReader - A flag to let the developer know if this validation call is for the reader or writer adapter.
Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.
-
-
public Tuple toTuple(ByteBuffer readBuffer, Schema schema);
Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer will be updated after reading data. If the position value is updated and a null values is returned the position will be reset back to what it was before entering this method. This method will be called repeatedly until a null value is returned or the buffer has zero bytes remaining.
Variables
-
ByteBuffer readBuffer - The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.
-
Schema schema - This is the schema to be used to create a tuple and fill its values from the incoming data.
Return Value: A valid tuple if data is available or null if a tuple could not be created.
-
-
public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);
Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple will be transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it, if no space is available and you return false the tuple will be discarded.
Variables
-
Tuple tuple - The current tuple that needs to be transformed and added to the writeBuffer.
-
ByteBuffer writeBuffer - This is the data that will be directly sent on the socket. The size of this buffer is set by the getBufferSize() method.
Return Value:
True if this tuple was successfully added to the write buffer.
False if a problem occurred or no space is available. If false is returned the adapter will attempt to write data out to the socket and free space and call this method again one more time with the same tuple before discarding the tuple.
-
-
public int getBufferSize();
Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.
Return Value: An integer value which specifies the size.
-
public String getName();
Description: This method is called to get the name of the data transformer. (This method is not currently used and is for future use)
Return Value: A string representing the name of the data transformer.