Creating StreamBase C++ Clients

This topic describes how to create enqueue and dequeue C++ client applications for StreamBase applications.

Writing C++ Enqueue Clients

This section describes how to use the StreamBase C++ library to write a client application that enqueues data to a StreamBase Server.

Sample source code: streambase-install-dir/sample/compliance/preload.cpp

The basic procedure for enqueuing data into a StreamBase node in C++ is:

  1. Include StreamBaseClient.hpp

  2. Create an instance of the StreamBaseClient class. If needed, specify the String URI of the desired StreamBase node as an argument. For example:

    StreamBaseClient client("sb://localhost:10000/");

  3. For every tuple of data to enqueue:

    1. Retrieve a Schema object for each stream you want to enqueue. For example:

      Schema schema = client.getSchemaForStream("InputStream");

      ...where "InputStream" is the name of an input stream in your StreamBase application.

    2. Create a Tuple of the given schema; in this example, the schema of the same stream as above:

      Tuple tuple(schema);

    3. Set values for each of the tuple's fields. For example:

      tuple.setInt("myint",5);
      tuple.setString("mystring","hello");
      // ...
      // where "myint" and "mystring" are field names in the input stream
      // of your StreamBase application
      
    4. Enqueue the tuple onto the stream. For example:

      client.enqueue("InputStream",tuple);

Compiling a StreamBase C++ Client on Linux

On a supported Linux machine where StreamBase is installed, use the sb-config utility to set up the environment and define the compiler to use when compiling your program. For example:

CXX=`sb-config --cxx`
$CXX MyClient.cpp `sb-config --cflags` -c -o MyClient.o
$CXX MyClient.o `sb-config --libs` -o MyClient

Substitute the name of your client for MyClient.

Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:

   /opt/tibco/sb-cep/n.m/sample/custom-aggregate-function
   /opt/tibco/sb-cep/n.m/sample/custom-simple-function

StreamBase requires G++ 3.4 through G++ 4.2 for building C++ clients on Linux. StreamBase does not support building clients with G++ 4.3 or 4.4, which are the default compilers on newer Linux distributions. On such distributions, install GCC and G++ 4.2, and set the CC and CXX environment variables before building StreamBase C++ code, including StreamBase samples that include C++ code. For example:

export CC=gcc-4.2
export CXX=g++-4.2

Compiling a StreamBase C++ Client on Windows

To build your C++ client application on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.

Buffering for C++ Enqueue Clients

For enqueue clients that use the StreamBase C++ Client library, you can enable tuple buffering and set the following parameters:

  • The buffer size (number of tuples per buffer)

  • The buffer's flush interval

You can also explicitly flush the buffer of a specified stream, or flush all buffers.

By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.

The examples in this section demonstrate how to use this feature. Here the buffer size is set to 100 tuples and the buffer flush interval is set to 1000 milliseconds (one second).

Notes: In the StreamBase Client library, buffering is only turned on if the value for the buf_size parameter is greater than zero. (The buf_size parameter specifies the number of tuples, not a byte limit.) Also the buffer is flushed at a regular interval specified by the flush_interval parameter. If the flush_interval is not greater than zero, the buffer will only be flushed when it reaches capacity (it is filled with tuples).

There are two ways to turn on buffering in C++. They are shown in the two examples that follow.

Buffering Example 1

The first buffering method using the StreamBase C++ Client library is to specify the buffering parameters in the StreamBaseClient constructor. For example:

#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"

using namespace sb;
   .
   .
   .
        // grab the URI, either from the command line or the environment
        StreamBaseURI uri;
        uri = StreamBaseURI::fromEnvironment();

        // connect to the StreamBaseServer
        int buf_size = 100;
        int flush_interval = 1000;
        StreamBaseClient client(uri,buf_size,flush_interval);

Buffering Example 2

The second buffering alternative for C++ clients is to use the enableBuffering method. For example:

#include "StreamBase.hpp"
#include "StreamBaseClient.hpp"
#include "Tuple.hpp"

using namespace sb;
   .
   .
   .
        // grab the URI, either from the command line 
        // or the environment
        StreamBaseURI uri;
        uri = StreamBaseURI::fromEnvironment();

        // connect to the StreamBaseServer
        int buf_size = 100;
        int flush_interval = 1000;
        StreamBaseClient client(uri);
                client.enableBuffering(buf_size,flush_interval);

Note: If buffering is enabled and the flush_interval is set, the buffer will be flushed periodically based on the specified interval. However, to have more control over enqueuing buffered tuples, you can also use the flushBuffer(stream_name) method of the StreamBaseClient class to enqueue the contents of a buffer immediately. Or, you can use the flushAllBuffers() method to enqueue the contents of all buffers. These methods assume that the caller has a lock on _buffers_lock.

Generally you want to "flush" a buffer when you are concerned that the tuples in the buffer may become stale (staleness is relative to the particular application). Let's say, for example, your buffer size is 1000 and your flush_interval is 0 (this value turns off periodic flushing). The buffer will automatically be flushed when it is filled. That is, when the 1000th tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear when or if more tuples will arrive to fill the buffer, it would be a good idea to call flushBuffer() or flushAllBuffers().

Writing C++ Dequeue Clients

This section describes how to use the StreamBase C++ library to write a client application that dequeues data from a StreamBase Server.

Note: You can make your application dequeue a subset of the server output instead of all the output. For details, see Narrowing Dequeue Results with Filtered Subscribe.

Performance Note: Dequeue (producer) clients that are slow may eventually get disconnected from the StreamBase Server process. The sbd process disconnects clients that try to allocate more memory than the limit set by the max-client-pages parameter in the server configuration file, which is designed to protect sbd from a slow or hung dequeue client.

The basic procedure for dequeuing data from a StreamBase node in C++ is as follows:

  1. Include StreamBaseClient.hpp

  2. Create an instance of the StreamBaseClient class. If needed, specify the String URI of the desired StreamBase node as an argument. For example:

    StreamBaseClient client("sb://localhost:10000/");

  3. Subscribe to each stream that you want to dequeue. For example:

    client.subscribe("OutputStream");

    ...where OutputStream is the name of an output stream in your StreamBase application.

  4. Call the dequeue(String s) method on the client, which blocks until a tuple becomes available. This method returns a list of Tuples dequeued from each of the streams to which you subscribed, and whose name you set with the string argument. For example:

    string stream;
    const TupleList tuples = client.dequeue(stream);
    //stream is now "OutputStream"
    
  5. If the Tuple list is empty, the server (or your client from another thread) is requesting that you close the connection; you should exit the dequeue method. For example:

    if (tuples.empty()) return;
    
  6. Otherwise, you can do the following:

    1. Loop over the Tuple list. For example:

      for (size_t i = 0; i < tuples.size(); ++i) {
      
    2. Retrieve each Tuple as a Tuple object. For example:

      const Tuple tuple = tuples[i];
      
    3. ...And use it to access the tuple's constituent fields. For example:

      int myint = tuple.getInt("myint");
      string mystring = tuple.getString("mystring");
           // ...
           // where "myint" and "mystring" are field names in the output stream
           // of your StreamBase application.
      
  7. If you wish to cancel the blocking dequeue call from any thread, (for example if your client is shutting down), call client.close();. In fact, before exiting, always call the client.close() method to flush the client network buffers.

    Important

    Note that the StreamBase Client library is not thread-safe. You cannot share a client connection across threads. For example, if you attempt to close a client from a thread other than the one in which it was created, a stack overflow error will occur. To run clients multi-threaded, you would need to establish a new client for each thread.

Compiling C++ Dequeue Clients

The procedure for compiling dequeue clients is the same as described above for enqueue clients. Remember to always call client.close() to flush the client network buffers before exiting.

Deploying C++ Clients to Windows Systems Without StreamBase

C++ clients can be installed on systems that do not have StreamBase installed. However, you also must include certain files in the directory with your client executable, or in a directory on the PATH. The files to include vary according to the Visual C++ version you used to build the client.

Visual Studio 2015-2022

If you are deploying StreamBase C++ clients built with Visual Studio to systems without StreamBase installed, you must install the Microsoft Visual C++ Redistributables package, or the equivalent, on the system.

Visual Studio 2015-2022 Redistibution Packages

You can find them at the following web page. Note that StreamBase supports the Intel 64-bit (x64) architecture only.

https://docs.microsoft.com/en-US/cpp/windows/latest-supported-vc-redist