Creating StreamBase Python Clients

This topic describes how to create enqueue and dequeue client applications for StreamBase applications using the StreamBase Client library for Python. The Python client libraries are shipped for Windows and Linux installations of StreamBase. macOS is not supported.

Important

The StreamBase Client library is not guaranteed to be thread-safe—that is, multiple threads cannot concurrently access the same streambase.Client object. You can create and use separate Client objects in separate threads, even if all those Client objects are connected to the same server. You can also guard a single Client object with a mutex, to ensure that only one thread at a time has access to it.

Initial Setup for Linux

On Linux platforms, the Python Client library is built for four Python versions: 2.4, 2.5, 2.6, and 2.7. A streambase.py module for each Python version is installed in /opt/tibco/sb-cep/n.m/lib64/pythonx.x, where x.x is the Python version number.

Set the PYTHONPATH environment variable to point to the location of the appropriate client library for the Python version on your system. For example, on 64-bit Red Hat Enterprise Linux 6, where the default Python version is 2.6, use a setting like the following:

export PYTHONPATH=/opt/tibco/sb-cep/n.m/lib64/python2.6:$PYTHONPATH

You can use the sb-config --pypath command to determine the correct Python library version to use. For example:

export PYTHONPATH=`sb-config --pypath`:$PYTHONPATH

Initial Setup for Windows

The StreamBase Python Client library supports two Python distributions on Windows platforms:

  • The 64-bit Windows binary installer for Python 2.7.2 or later from python.org.

  • The 64-bit ActivePython release 2.7.2 or later from ActiveState.

Install Python 2.7 into C:\Python27.

In a StreamBase Command Prompt, set the PYTHONPATH environment variable to point to the location of the Python Client library:

set PYTHONPATH=%STREAMBASE_HOME%\lib64\python2.7

Writing Python Enqueue Clients

This section describes how to use the StreamBase Python library to write a client application that enqueues data to StreamBase Server. A sample file, SimpleEnqueuer.py, is provided as part of the client sample shipped with the StreamBase base kit in $STREAMBASE_HOME/sample/client.

The basic procedure for enqueuing data into a StreamBase node from Python is:

  1. Import the StreamBase Python module:

    import streambase as sb

  2. Create an instance of the streambase.Client class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:

    client = sb.client("sb://localhost:10000/")

  3. Enqueue tuples and terminate.

Your Python program can exploit the homology between StreamBase tuples and Python dictionaries, and between StreamBase lists and Python lists. For example, to enqueue tuples having the schema {list bids {double Price, int Volume}}, your Python client code would be:

import streambase as sb

client = sb.Client("sb://localhost")
client.enqueue("BidsIn", {'bids': [ {'Price': 10.0, 'Volume': 100}, {'Price': 5.0, 'Volume': 50}]})
client.close()

You can also:

  1. Enqueue the tuple onto the stream directly, as a Python tuple or dictionary:

    client.enqueue("InputStream", (5, "hello"))
    client.enqueue("InputStream", {'myint':5, 'mystring':"hello"})
  2. Manually build a streambase.Tuple object and enqueue it:

    1. Retrieve a streambase.Schema object for each stream to which you want to enqueue. For example:

      schema = client.getSchemaForStream("InputStream")
    2. Create a streambase.Tuple object of the specified schema. In this example, the schema of the above stream:

      sbtuple = sb.Tuple(schema)
    3. Set values for each of the tuple's fields, specifying them by name or position. For example:

      sbtuple['myint'] = 5
      sbtuple['mystring'] = "hello"
      
      #  ...is equivalent to...
      
      sbtuple[0] = 5
      sbtuple[1] = "hello"
      
      #  ...is equivalent to...
      
      sbtuple.setInt('myint', 5)
      sbtuple.setString('mystring', "hello")  
    4. Enqueue the streambase.Tuple onto the stream, which you can reference by name. For example:

      client.enqueue("InputStream", sbtuple)

For details on Python Dictionaries, refer to chapter 3 of the Python Language Reference.

Running a StreamBase Python Client

  1. On a supported Linux machine where the StreamBase client library and Python library are installed, set the PYTHONPATH environment variable as described in Initial Setup for Linux.

    For Windows, follow the steps in Initial Setup for Windows.

  2. Run your StreamBase client in the Python interpreter. For example:

    python mysbclient.py

  3. If the Python interpreter fails with an error such as ImportError: libsbclient.so.9: cannot open shared object file: No such file or directory, make sure the StreamBase client library is available to your linker by adding its path to your LD_LIBRARY_PATH environment variable. For example, on a 64-bit Linux system using Bash and with StreamBase installed its default location, use the following command:

    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/tibco/sb-cep/n.m/lib64

Writing Python Dequeue Clients

This section describes how to use the StreamBase Python library to write a client application that dequeues data from StreamBase Server. A sample file, SimpleDequeuer.py, is provided as part of the client sample shipped with the StreamBase base kit in $STREAMBASE_HOME/sample/client.

Performance Note

Dequeue (producer) clients that are slow may eventually get disconnected from the StreamBase Server process, sbd. The server configuration file includes the max-client-pages attribute to the <page-pool> element. The sbd process will disconnect clients that try to allocate more memory than the limit set by this parameter, and is designed to protect sbd from a slow or hung dequeue client. For more information, see the <page-pool> element in the StreamBase Server Configuration XML topic in StreamBase References.

The basic procedure for dequeuing data from a StreamBase node to Python is as follows:

  1. Import the StreamBase Python module with a line like the following:

    import streambase as sb
  2. Create an instance of the streambase.Client class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:

    client = sb.Client("sb://localhost:10000/")
  3. Subscribe to each output stream from which you want to dequeue. For example:

    client.subscribe("OutputStream")

  4. Call the dequeue() method on the client, which blocks until a tuple becomes available. This method returns a streambase.DequeueResult object, which contains a status flag that indicates whether tuples were received.

    Warning

    TIBCO strongly recommends that you set a timeout on the dequeue and loop waiting for a GOOD status flag. Otherwise you will not be able to use Ctrl+C to exit your Python program while it is waiting for tuples to dequeue, and you will have to kill it with some other method. The dequeue timeout can be large: you are simply setting the maximum latency between the user issuing a Ctrl+C and the program terminating.

    DEFAULT_TIMEOUT = 500       # milliseconds
    result = sb.DequeueResult()
    while result.getStatus != sb.DequeueResult.GOOD:
        result = client.dequeue(DEFAULT_TIMEOUT)
  5. Once dequeue() has received tuples, you can get from the DequeueResult a streambase.TupleList object containing the streambase.Tuples dequeued, which behaves as a Python list. For example:

    tuples = result.getTuples()
    print "Got " + str(len(tuples)) + " tuple" + ((len(tuples) == 1) and "s" or "")
    print "First tuple is: " + str(tuples[0])
  6. If the result of dequeue().getStatus() is streambase.DequeueResult.CLOSED, the server (or your client from another thread) has closed the connection, and your program should respond appropriately.

  7. Having received tuples, you can access them any of several ways. You can:

    1. Loop over the TupleList. For example:

      for t in tuples:
          print t
    2. Access the Tuple's fields by name or index. For example:

      print "myint: " + str(t[0]) + ", mystring: " + t[1]
      print "myint: " + str(t['myint']) + ", mystring: " + t['mystring']
  8. If you wish to cancel the blocking dequeue() call (for example, if your client is shutting down), call the close() method on the Client object. In fact, before exiting, always call the streambase.Client.close() method to flush the client network buffers.

Running Python Dequeue Clients

The procedure for running Python dequeue clients is the same as described above for enqueue clients. Remember to always call streambase.Client.close()to flush the client network buffers before exiting.