StreamBase C++ API  10.6.1.0
sb::StreamBaseClient Class Reference

The StreamBase client API. More...

#include <StreamBaseClient.hpp>

Public Types

enum  ListEntitiesFlags { FULLY_QUALIFIED_NAMES = 1, INCLUDE_MODULES = 2, ALL_CONTAINERS = 4 | FULLY_QUALIFIED_NAMES }
 Flags for the listEntities call This must be consistent with what is found on the Java side of the house in StreamBaseClient.java. More...
 

Public Member Functions

 StreamBaseClient (const StreamBaseURI &uri=StreamBaseURI::fromEnvironment(), int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL)
 Creates an StreamBase client and establishes a connection to a remote server. More...
 
 StreamBaseClient (const std::string &uris, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL)
 
 StreamBaseClient (const std::vector< StreamBaseURI > &uris, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL)
 
 StreamBaseClient (const std::vector< StreamBaseURI > &uris, sb::ClientSettings &settings, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL)
 
 ~StreamBaseClient ()
 Destroys a session. More...
 
void listEntities (const std::string &type, int flags, std::vector< std::string > &names)
 Lists all entities of a particular type. More...
 
void listEntities (StreamBaseEntityType::Type type, int flags, std::vector< std::string > &names)
 Lists all entities of a particular type. More...
 
void listEntities (StreamBaseEntityType::Type type, std::vector< std::string > &names)
 Lists all entities of a particular type. More...
 
std::string describe (const std::string &entity)
 Returns an XML description of an entity, or an empty string if the method does not exist. More...
 
bool isStreamSubscribed (const std::string &entity)
 Returns true if this stream has been subscribed to. More...
 
StreamProperties getStreamProperties (const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN)
 Returns a description of a stream, throwing an exception if the stream does not exist. More...
 
StreamProperties getStreamPropertiesByHash (const std::string &hex)
 Returns a description of a stream, throwing an exception if the stream does not exist. More...
 
Schema getSchemaForStream (const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN)
 Returns the schema of a stream, throwing an exception if the stream does not exist. More...
 
void typecheck (const std::string &sbapp, std::map< std::string, Schema > &streams, bool full=false)
 Typechecks a potential modification to the application, outputting properties of all streams in the application. More...
 
StreamProperties subscribe (const std::string &stream_name)
 Subscribes to a stream. More...
 
StreamProperties subscribe (const StreamProperties &props)
 Subscribes to a stream. More...
 
StreamProperties subscribe (const std::string &stream_name, const std::string &logical_stream, const std::string &predicate)
 Subscribes to a stream with a predicate to apply to output tuples. More...
 
StreamProperties subscribe (const StreamProperties &props, const std::string &logical_stream, const std::string &predicate)
 Subscribes to a stream with a predicate to apply to output tuples. More...
 
StreamProperties resubscribe (const std::string &stream_name, const std::string &logical_stream, const std::string &predicate)
 Resubscribes to an already subscribed stream. More...
 
StreamProperties resubscribe (const StreamProperties &props, const std::string &logical_stream, const std::string &predicate)
 Resubscribes to an already subscribed stream. More...
 
void unsubscribe (const std::string &stream_name)
 UnSubscribes to a stream. More...
 
void unsubscribe (const StreamProperties &props)
 UnSubscribes to a stream. More...
 
void setDequeueResultInterceptor (DequeueResult::Interceptor *dri)
 Set the dequeue results interceptor for this client connection. More...
 
DequeueResult::InterceptorgetDequeueResultsInterceptor ()
 Get the current dequeue results interceptor, or null if there is no current processor. More...
 
TupleList dequeue (StreamProperties &streamProperties)
 Dequeue tuples from any subscribed stream. More...
 
TupleList dequeue (std::string &stream_name)
 Dequeue tuples from any subscribed stream. More...
 
DequeueResult dequeue ()
 Dequeue tuples from any subscribed stream. More...
 
DequeueResult dequeue (int timeout_ms)
 Dequeue tuples from any subscribed stream. More...
 
void enqueue (const std::string &stream_name, Tuple &tuple)
 Enqueue tuples to a stream. More...
 
void enqueue (const StreamProperties &props, Tuple &tuple)
 Enqueue tuples to a stream. More...
 
void enqueue (const std::string &stream_name, TupleList &tuples)
 Enqueue tuples to a stream. More...
 
void enqueue (const StreamProperties &props, TupleList &tuples)
 Enqueue tuples to a stream. More...
 
Schema getSchemaByHash (const std::string &hash)
 Returns a schema with a particular hash. More...
 
Schema getSchemaByName (const std::string &name)
 Returns a schema by name. More...
 
void operatorStatus (const std::string &containerName, std::vector< std::string > &aStatus)
 status of java operators and adapters More...
 
void status (std::vector< std::string > &aStatus, bool verbose=false)
 status the streambase daemons More...
 
const StreamBaseURIgetURI () const
 Return the URI we're talking to. More...
 
const std::vector
< StreamBaseURI > & 
getURIs () const
 
void close ()
 Terminate the client. More...
 
bool isClosed ()
 Return true if the client connection is closed. More...
 
bool canDequeue ()
 Return true if we can call dequeue without blocking. More...
 
void enableHeartbeating ()
 Enable heartbeating for this client. Generally this is only for enqueue-only clients. More...
 
bool isConnected ()
 Returns true if this TupleConnections has any live connections to a server. More...
 
void enableBuffering (int buffer_size, int flush_interval=DEFAULT_FLUSH_INTERVAL)
 Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0. More...
 
void flushAllBuffers ()
 Flush any pending enqueue buffers. More...
 
void flushBuffer (const StreamProperties &props)
 Flush any pending enqueue buffer for the StreamProperties provided. More...
 
void flushBuffer (const std::string &stream_name)
 Flush any pending enqueue buffer for the stream name provided. More...
 
bool getIsBuffering () const
 Return whether buffering is turned on or off. More...
 
int getBufferSize () const
 Return buffer size (in tuples) More...
 
std::string getConnectionID () const
 Return the Connection ID for this Client Connection. More...
 
Tuple getDynamicVariables (const std::string &modulePath)
 Get all the dynamic variables in the given module, and return them as a Tuple where the field names are the names of the dynamic variables, and the field values are the current values of the dynamic variables. More...
 
void setDynamicVariable (const std::string &dynamicVariablePath, const std::string &value)
 Set the dynamic variable at the given path to the given value, expressed as a string in CSV style. More...
 
std::string getConnectionIdAsHexString () const
 Return the Connection ID for this Client Connection. More...
 
TupleList readTable (const std::string &tablePath, int limit, const std::string &predicate="")
 Read rows out of the table at this path. More...
 
int setQuiescentLimit (int timeoutMS)
 Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart beat from the StreamBase server that it is connected to. More...
 
std::string getVersion ()
 Returns the client version as a string. More...
 
sb::ClientSettingsgetSettings ()
 Get the settings for the client. More...
 
long long getTupleEnqueueCount () const
 Returns the number of tuples this client has actually enqueued to the server. More...
 
long long getTupleDequeueCount () const
 Returns the number of tuples this client has dequeued from the server. More...
 

Static Public Attributes

static const unsigned int DEFAULT_FLUSH_INTERVAL = 250
 

Detailed Description

The StreamBase client API.

Connects to an StreamBase node over the network, via XML/RPC and the tuple wire protocol. With the exception of the close() method, the StreamBaseClient object is not thread safe, so access to a single object needs to be synchronized between threads. If multiple threads subscribe to streams, it is recommended that they use separate instances of this class.

Member Enumeration Documentation

Flags for the listEntities call This must be consistent with what is found on the Java side of the house in StreamBaseClient.java.

Enumerator
FULLY_QUALIFIED_NAMES 

set FULLY_QUALIFIED_NAMES if you want to include container names for all entities.

If not set then the returned names will contain the module name (if any) and the name of the entity.

INCLUDE_MODULES 

set INCLUDE_MODULES if you want to include all modules in the output

ALL_CONTAINERS 

set ALL_CONTAINERS if you want to include all user containers in the output.

This does not include the "system" container.

Constructor & Destructor Documentation

sb::StreamBaseClient::StreamBaseClient ( const StreamBaseURI uri = StreamBaseURI::fromEnvironment(),
int  buffer_size = 0,
int  flush_interval = DEFAULT_FLUSH_INTERVAL 
)

Creates an StreamBase client and establishes a connection to a remote server.

Optional parameter buffer_size specifies the number of tuples to buffer before enqueueing. Optional parameter flush_interval specifies the interval in milliseconds between wakeups of the wakeAndFlushBuffer. The wakeAndFlushBuffer thread is only started if buffer_size > 0.

sb::StreamBaseClient::~StreamBaseClient ( )

Destroys a session.

Member Function Documentation

bool sb::StreamBaseClient::canDequeue ( )

Return true if we can call dequeue without blocking.

This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.

Returns
boolean if we can dequeue without blocking
void sb::StreamBaseClient::close ( )

Terminate the client.

May be invoked from a different thread. StreamBaseClient memory, network, and thread resources are not released until close() is called.

Returns immediately.

TupleList sb::StreamBaseClient::dequeue ( StreamProperties streamProperties)

Dequeue tuples from any subscribed stream.

This method blocks until

  • at least one tuple is available on any subscribed stream, or
  • the node is shut down.
Deprecated:
Use DequeueResult based dequeue
Parameters
streamProperties(output) set to the StreamProperties of the stream from which tuples have been dequeued
Returns
a list of the tuples dequeued; empty if the node has been shut down
TupleList sb::StreamBaseClient::dequeue ( std::string &  stream_name)

Dequeue tuples from any subscribed stream.

This method blocks until

  • at least one tuple is available on any subscribed stream, or
  • the node is shut down.
Deprecated:
Use DequeueResult based dequeue
Parameters
stream_name(output) set to the name of the stream from which tuples have been dequeued. Note this does not contain the container name
Returns
a list of the tuples dequeued; empty if the node has been shut down
DequeueResult sb::StreamBaseClient::dequeue ( )

Dequeue tuples from any subscribed stream.

This method blocks until

  • At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
  • The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
  • A network error occurs (StreamBaseException will be thrown)
Returns
a DequeueResult containing the results of the operation
DequeueResult sb::StreamBaseClient::dequeue ( int  timeout_ms)

Dequeue tuples from any subscribed stream.

This method blocks until

  • At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
  • The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
  • The number of Milliseconds specified has elapsed (DequeueResult will have a status of TIMEOUT)
  • A network error occurs (StreamBaseException will be thrown)

A timeout_ms of zero will block indefinitely, or until a tuple arrives. Note that the actual dequeue timeout may vary based on normal thread scheduling, plus network interactions with the server.

Returns
a DequeueResult containing the results of the operation
std::string sb::StreamBaseClient::describe ( const std::string &  entity)

Returns an XML description of an entity, or an empty string if the method does not exist.

void sb::StreamBaseClient::enableBuffering ( int  buffer_size,
int  flush_interval = DEFAULT_FLUSH_INTERVAL 
)

Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0.

void sb::StreamBaseClient::enableHeartbeating ( )

Enable heartbeating for this client. Generally this is only for enqueue-only clients.

void sb::StreamBaseClient::enqueue ( const std::string &  stream_name,
Tuple tuple 
)

Enqueue tuples to a stream.

The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.

Note: enqueue() will modify the tuple as it is enqueued. If you do not want the tuple modified you must make a copy of it before calling enqueue.

Deprecated:
Use StreamProperties based enqueue
Parameters
stream_namethe name of the stream to which to enqueue
tuplea tuple to enqueue
void sb::StreamBaseClient::enqueue ( const StreamProperties props,
Tuple tuple 
)

Enqueue tuples to a stream.

The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.

Note: enqueue() will modify the tuple as it is enqueued. If you do not want the tuple modified you must make a copy of it before calling enqueue.

Parameters
propsthe StreamProperties to enqueue the tuple to
tuplea tuple to enqueue
void sb::StreamBaseClient::enqueue ( const std::string &  stream_name,
TupleList tuples 
)

Enqueue tuples to a stream.

The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.

Note: enqueue() will modify the tuples as they are enqueued. If you do not want the tuples modified you must make copies before calling enqueue.

Deprecated:
Use StreamProperties based enqueue
Parameters
stream_namethe name of the stream to which to enqueue
tuplesa list of tuples to enqueue
void sb::StreamBaseClient::enqueue ( const StreamProperties props,
TupleList tuples 
)

Enqueue tuples to a stream.

The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.

Note: enqueue() will modify the tuples as they are enqueued. If you do not want the tuples modified you must make copies before calling enqueue.

Parameters
propsthe StreamProperties to enqueue the tuples to
tuplesa list of tuples to enqueue
void sb::StreamBaseClient::flushAllBuffers ( )

Flush any pending enqueue buffers.

This operation has no effect if buffering is not enabled.

Exceptions
StreamBaseExceptionif there is an IO error while flushing the buffer
void sb::StreamBaseClient::flushBuffer ( const StreamProperties props)

Flush any pending enqueue buffer for the StreamProperties provided.

This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.
Note: Note that this will cause inter-stream ordering to be interrupted.

Parameters
propsthe stream whose enqueue buffers to flush, if not empty
Exceptions
StreamBaseExceptionif there is an IO error while flushing the buffer
Deprecated:
use flushAllBuffers() to preserve inter-stream ordering
void sb::StreamBaseClient::flushBuffer ( const std::string &  stream_name)

Flush any pending enqueue buffer for the stream name provided.

This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.
Note: Note that this will cause inter-stream ordering to be interrupted.

Parameters
stream_namethe stream whose enqueue buffers to flush, if not empty
Exceptions
StreamBaseExceptionif there is an IO error while flushing the buffer
Deprecated:

stream_name use StreamProperties based flushBuffer

use flushAllBuffers() to preserve inter-stream ordering

int sb::StreamBaseClient::getBufferSize ( ) const

Return buffer size (in tuples)

std::string sb::StreamBaseClient::getConnectionID ( ) const

Return the Connection ID for this Client Connection.

Only Valid once an enqueue/dequeue has been attempted.

Returns
connection ID in binary format
std::string sb::StreamBaseClient::getConnectionIdAsHexString ( ) const

Return the Connection ID for this Client Connection.

Only Valid once an enqueue/dequeue has been attempted.

Returns
connection ID in Hexadecimal format
DequeueResult::Interceptor* sb::StreamBaseClient::getDequeueResultsInterceptor ( )

Get the current dequeue results interceptor, or null if there is no current processor.

Tuple sb::StreamBaseClient::getDynamicVariables ( const std::string &  modulePath)

Get all the dynamic variables in the given module, and return them as a Tuple where the field names are the names of the dynamic variables, and the field values are the current values of the dynamic variables.

Exceptions
StreamBaseExceptionon network or other errors, or if there are no dynamic variables in the named module
bool sb::StreamBaseClient::getIsBuffering ( ) const

Return whether buffering is turned on or off.

Schema sb::StreamBaseClient::getSchemaByHash ( const std::string &  hash)

Returns a schema with a particular hash.

Schema sb::StreamBaseClient::getSchemaByName ( const std::string &  name)

Returns a schema by name.

This will only succeed for named schemas; anonymous schemas assigned to a port should instead be looked up via getStreamProperties().getSchema().

Schema sb::StreamBaseClient::getSchemaForStream ( const std::string &  entity,
StreamProperties::CaptureTransformStrategy  strategy = StreamProperties::FLATTEN 
)

Returns the schema of a stream, throwing an exception if the stream does not exist.

sb::ClientSettings& sb::StreamBaseClient::getSettings ( )
inline

Get the settings for the client.

StreamProperties sb::StreamBaseClient::getStreamProperties ( const std::string &  entity,
StreamProperties::CaptureTransformStrategy  strategy = StreamProperties::FLATTEN 
)

Returns a description of a stream, throwing an exception if the stream does not exist.

Parameters
entitythe path of the stream
strategyhow to handle capture fields on the stream
StreamProperties sb::StreamBaseClient::getStreamPropertiesByHash ( const std::string &  hex)

Returns a description of a stream, throwing an exception if the stream does not exist.

long long sb::StreamBaseClient::getTupleDequeueCount ( ) const

Returns the number of tuples this client has dequeued from the server.

This number does not include tuples that may be dequeued as part of system services such as heartbeating. This number does however include tuples that have been dequeued but have been intercepted by the Interceptor.

long long sb::StreamBaseClient::getTupleEnqueueCount ( ) const

Returns the number of tuples this client has actually enqueued to the server.

Note that this number will not include any tuples that have been buffered but have not actually been enqueued to the server.

const StreamBaseURI& sb::StreamBaseClient::getURI ( ) const

Return the URI we're talking to.

std::string sb::StreamBaseClient::getVersion ( )
inline

Returns the client version as a string.

bool sb::StreamBaseClient::isClosed ( )

Return true if the client connection is closed.

The client connection can be closed by calling the close() method, or by a server shutdown, or a network error.

bool sb::StreamBaseClient::isConnected ( )

Returns true if this TupleConnections has any live connections to a server.

Note that a return of "false" doesn't necessarily indicate an error since the TupleConnection's state (e.g., lack of subscriptions) might mean no connections are needed.

Returns
boolean if we are connected
bool sb::StreamBaseClient::isStreamSubscribed ( const std::string &  entity)

Returns true if this stream has been subscribed to.

Parameters
entitythe path of the stream
void sb::StreamBaseClient::listEntities ( const std::string &  type,
int  flags,
std::vector< std::string > &  names 
)

Lists all entities of a particular type.

(One can then use "describe" to obtain a description of an entity.) Names is cleared first. Use container.entity-type to resolve across containers. use the given flags to list the entities

void sb::StreamBaseClient::listEntities ( StreamBaseEntityType::Type  type,
int  flags,
std::vector< std::string > &  names 
)

Lists all entities of a particular type.

(One can then use "describe" to obtain a description of an entity.) names is cleared first. use the given flags to list the entities

void sb::StreamBaseClient::listEntities ( StreamBaseEntityType::Type  type,
std::vector< std::string > &  names 
)

Lists all entities of a particular type.

(One can then use "describe" to obtain a description of an entity.) names is cleared first.

void sb::StreamBaseClient::operatorStatus ( const std::string &  containerName,
std::vector< std::string > &  aStatus 
)

status of java operators and adapters

TupleList sb::StreamBaseClient::readTable ( const std::string &  tablePath,
int  limit,
const std::string &  predicate = "" 
)

Read rows out of the table at this path.

Limit to the specified number of rows returned, or -1 for no limit. Filter rows according to predicate, or return all rows if predicate is empty.

Returns
a list of matching rows.
StreamProperties sb::StreamBaseClient::resubscribe ( const std::string &  stream_name,
const std::string &  logical_stream,
const std::string &  predicate 
)

Resubscribes to an already subscribed stream.

Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.

Returns
the StreamProperties for the stream
StreamProperties sb::StreamBaseClient::resubscribe ( const StreamProperties props,
const std::string &  logical_stream,
const std::string &  predicate 
)

Resubscribes to an already subscribed stream.

Returns
the StreamProperties for the stream
void sb::StreamBaseClient::setDequeueResultInterceptor ( DequeueResult::Interceptor dri)

Set the dequeue results interceptor for this client connection.

This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null.

This method cannot be safely called while another thread is calling dequeue().

NOTE: Once the DequeueResult.Interceptor is passed to StreamBaseClient, StreamBaseClient will manage the lifecycle of the object. StreamBaseClient will delete the object when the interceptor is reset or when the StreamBaseClient is deleted. Do not delete this passed object!!

void sb::StreamBaseClient::setDynamicVariable ( const std::string &  dynamicVariablePath,
const std::string &  value 
)

Set the dynamic variable at the given path to the given value, expressed as a string in CSV style.

Exceptions
StreamBaseExceptionif the dynamic variable does not exist, or the value is not appropriate for setting the dynamic variable
int sb::StreamBaseClient::setQuiescentLimit ( int  timeoutMS)

Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart beat from the StreamBase server that it is connected to.

The default value is 120 seconds (120000). By default, StreamBase servers emit "client heart beats" every 10 seconds so StreamBase applications have no requirement to send data regularly.

Parameters
timeoutMSThe number of milliseconds that is allowed to pass without receiving a message from the StreamBase server client heart beat stream.
void sb::StreamBaseClient::status ( std::vector< std::string > &  aStatus,
bool  verbose = false 
)

status the streambase daemons

StreamProperties sb::StreamBaseClient::subscribe ( const std::string &  stream_name)

Subscribes to a stream.

Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.

Returns
the StreamProperties for the stream
StreamProperties sb::StreamBaseClient::subscribe ( const StreamProperties props)

Subscribes to a stream.

Returns
the StreamProperties for the stream
StreamProperties sb::StreamBaseClient::subscribe ( const std::string &  stream_name,
const std::string &  logical_stream,
const std::string &  predicate 
)

Subscribes to a stream with a predicate to apply to output tuples.

The stream name of dequeued tuples is logical_stream. When unsubscribing, use logical_stream.

Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.

Parameters
stream_namethe stream to subscribe to, error if empty
logical_streamthe name of the logical stream to associate with this predicate (if empty, defaults to streamname)
predicatea predicate to apply to subset the stream, error if empty
Returns
the StreamProperties for the stream
StreamProperties sb::StreamBaseClient::subscribe ( const StreamProperties props,
const std::string &  logical_stream,
const std::string &  predicate 
)

Subscribes to a stream with a predicate to apply to output tuples.

The stream name of dequeued tuples is logical_stream. When unsubscribing, use logical_stream.

Parameters
propsthe stream to subscribe to, error if empty
logical_streamthe name of the logical stream to associate with this predicate (if empty, defaults to streamname)
predicatea predicate to apply to subset the stream, error if empty
Returns
the StreamProperties for the stream
void sb::StreamBaseClient::typecheck ( const std::string &  sbapp,
std::map< std::string, Schema > &  streams,
bool  full = false 
)

Typechecks a potential modification to the application, outputting properties of all streams in the application.

streams is cleared first.

Parameters
sbappthe application
streamsthe schema defs of the streams
fulldo a full typecheck
void sb::StreamBaseClient::unsubscribe ( const std::string &  stream_name)

UnSubscribes to a stream.

Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.

void sb::StreamBaseClient::unsubscribe ( const StreamProperties props)

UnSubscribes to a stream.

Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.


The documentation for this class was generated from the following file: