Contents
This topic describes how to create enqueue and dequeue C++ client applications for StreamBase applications.
This section describes how to use the StreamBase C++ library to write a client application that enqueues data to a StreamBase Server.
Sample source code: streambase-install-dir
/sample/compliance/preload.cpp
The basic procedure for enqueuing data into a StreamBase node in C++ is:
-
Include
StreamBaseClient.hpp
-
Create an instance of the
StreamBaseClient
class. If needed, specify the String URI of the desired StreamBase node as an argument. For example:StreamBaseClient client("sb://localhost:10000/");
-
For every tuple of data to enqueue:
-
Retrieve a
Schema
object for each stream you want to enqueue. For example:Schema schema = client.getSchemaForStream("InputStream");
...where "
InputStream
" is the name of an input stream in your StreamBase application. -
Create a
Tuple
of the given schema; in this example, the schema of the same stream as above:Tuple tuple(schema);
-
Set values for each of the tuple's fields. For example:
tuple.setInt("myint",5); tuple.setString("mystring","hello"); // ... // where "myint" and "mystring" are field names in the input stream // of your StreamBase application
-
Enqueue the tuple onto the stream. For example:
client.enqueue("InputStream",tuple);
-
On a supported Linux machine
where StreamBase is installed, use the sb-config
utility
to set up the environment and define the compiler to use when compiling your program.
For example:
CXX=`sb-config --cxx` $CXX MyClient.cpp `sb-config --cflags` -c -oMyClient
.o $CXX MyClient.o `sb-config --libs` -oMyClient
Substitute the name of your client for MyClient
.
Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:
/opt/tibco/sfire-sfds/11.1/sample/custom-aggregate-function
/opt/tibco/sfire-sfds/11.1/sample/custom-simple-function
StreamBase requires G++ 8.5 for building C++ clients on Linux. StreamBase does not support building clients with G++ versions newer than 8.5, which are the default compilers on new Linux distributions.
To build your C++ client application on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.
For enqueue clients that use the StreamBase C++ Client library, you can enable tuple buffering and set the following parameters:
-
The buffer size (number of tuples per buffer)
-
The buffer's flush interval
You can also explicitly flush the buffer of a specified stream, or flush all buffers.
By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.
The examples in this section demonstrate how to use this feature. Here the buffer
size is set to 100
tuples and the buffer flush interval
is set to 1000
milliseconds (one second).
Notes: In the StreamBase Client library,
buffering is only turned on if the value for the buf_size
parameter is greater than zero. (The buf_size
parameter specifies the number of tuples, not a byte
limit.) Also the buffer is flushed at a regular interval specified by the
flush_interval
parameter. If the flush_interval
is not greater than zero, the buffer will only be
flushed when it reaches capacity (it is filled with tuples).
There are two ways to turn on buffering in C++. They are shown in the two examples that follow.
The first buffering method using the StreamBase C++ Client library is to specify
the buffering parameters in the StreamBaseClient
constructor. For example:
#include "StreamBase.hpp" #include "StreamBaseClient.hpp" #include "Tuple.hpp" using namespace sb; . . . // grab the URI, either from the command line or the environment StreamBaseURI uri; uri = StreamBaseURI::fromEnvironment(); // connect to the StreamBaseServer int buf_size = 100; int flush_interval = 1000; StreamBaseClient client(uri,buf_size,flush_interval);
The second buffering alternative for C++ clients is to use the enableBuffering
method. For example:
#include "StreamBase.hpp" #include "StreamBaseClient.hpp" #include "Tuple.hpp" using namespace sb; . . . // grab the URI, either from the command line // or the environment StreamBaseURI uri; uri = StreamBaseURI::fromEnvironment(); // connect to the StreamBaseServer int buf_size = 100; int flush_interval = 1000; StreamBaseClient client(uri); client.enableBuffering(buf_size,flush_interval);
Note: If buffering is enabled and the
flush_interval
is set, the buffer will be flushed
periodically based on the specified interval. However, to have more control over
enqueuing buffered tuples, you can also use the flushBuffer(stream_name)
method of the StreamBaseClient
class to enqueue the contents of a buffer
immediately. Or, you can use the flushAllBuffers()
method to enqueue the contents of all buffers. These methods assume that the caller
has a lock on _buffers_lock
.
Generally you want to "flush" a buffer when you are concerned that the tuples in
the buffer may become stale (staleness is relative to the particular application).
Let's say, for example, your buffer size is 1000 and your flush_interval
is 0 (this value turns off periodic flushing). The
buffer will automatically be flushed when it is filled. That is, when the 1000th
tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear
when or if more tuples will arrive to fill the buffer, it would be a good idea to
call flushBuffer()
or flushAllBuffers()
.
This section describes how to use the StreamBase C++ library to write a client application that dequeues data from a StreamBase Server.
Note: You can make your application dequeue a subset of the server output instead of all the output. For details, see Narrowing Dequeue Results with Filtered Subscribe.
Performance Note: Dequeue (producer)
clients that are slow may eventually get disconnected from
the StreamBase engine. The StreamBase engine disconnects clients that try to allocate
more memory than the limit set by the max-client-pages
parameter in the server configuration file, which is designed to protect StreamBase
Server from a slow or hung dequeue client.
The basic procedure for dequeuing data from a StreamBase node in C++ is as follows:
-
Include
StreamBaseClient.hpp
-
Create an instance of the
StreamBaseClient
class. If needed, specify the String URI of the desired StreamBase node as an argument. For example:StreamBaseClient client("sb://localhost:10000/");
-
Subscribe to each stream that you want to dequeue. For example:
client.subscribe("OutputStream");
...where
OutputStream
is the name of an output stream in your StreamBase application. -
Call the
dequeue(String s)
method on the client, which blocks until a tuple becomes available. This method returns a list of Tuples dequeued from each of the streams to which you subscribed, and whose name you set with the string argument. For example:string stream; const TupleList tuples = client.dequeue(stream); //stream is now "OutputStream"
-
If the Tuple list is empty, the server (or your client from another thread) is requesting that you close the connection; you should exit the dequeue method. For example:
if (tuples.empty()) return;
-
Otherwise, you can do the following:
-
Loop over the Tuple list. For example:
for (size_t i = 0; i < tuples.size(); ++i) {
-
Retrieve each Tuple as a
Tuple
object. For example:const Tuple tuple = tuples[i];
-
...And use it to access the tuple's constituent fields. For example:
int myint = tuple.getInt("myint"); string mystring = tuple.getString("mystring"); // ... // where "myint" and "mystring" are field names in the output stream // of your StreamBase application.
-
-
If you wish to cancel the blocking
dequeue
call from any thread, (for example if your client is shutting down), callclient.close();
. In fact, before exiting, always call theclient.close()
method to flush the client network buffers.Important
Note that the StreamBase Client library is not thread-safe. You cannot share a client connection across threads. For example, if you attempt to close a client from a thread other than the one in which it was created, a stack overflow error will occur. To run clients multi-threaded, you would need to establish a new client for each thread.
The procedure for compiling dequeue clients is the same as described above for enqueue clients. Remember to
always call client.close()
to flush the client network buffers before
exiting.
C++ clients can be installed on systems that do not have StreamBase installed. However, you also must include certain files in the directory with your client executable, or in a directory on the PATH. The files to include vary according to the Visual C++ version you used to build the client.
- Visual Studio 2015-2022
-
If you are deploying StreamBase C++ clients built with Visual Studio to systems without StreamBase installed, you must install the Microsoft Visual C++ Redistributables package, or the equivalent, on the system.
You can find them at the following web page. Note that StreamBase supports the Intel 64-bit (x64) architecture only.
https://docs.microsoft.com/en-US/cpp/windows/latest-supported-vc-redist
-
Creating Java Clients in the API Guide