Contents
This topic describes how to create enqueue and dequeue client applications for StreamBase applications using the StreamBase Client library for Python. The Python client libraries are shipped for Linux installations of StreamBase. Windows and macOS are not supported.
Important
The StreamBase Client library is not guaranteed to be thread-safe—that is, multiple
threads cannot concurrently access the same streambase.Client
object. You can create and use separate Client
objects in separate threads, even if all those Client
objects are connected to the same server. You can also guard a single Client
object with a mutex, to ensure that only one thread at a
time has access to it.
On Linux platforms, the Python Client library is built only for Python version 2.7. A
streambase.py
module is installed in /opt/tibco/sb-cep/11.1/lib64/python2.7
.
Set the PYTHONPATH
environment variable to point to the
location of the appropriate client library for the Python version on your system. For
example, on 64-bit Red Hat Enterprise Linux 6, where the default Python version is
2.6, use a setting like the following:
export PYTHONPATH=/opt/tibco/str/11.1/lib64/python2.7:$PYTHONPATH
You can use the sb-config --pypath command to determine the correct Python library version to use. For example:
export PYTHONPATH=`sb-config --pypath`:$PYTHONPATH
This section describes how to use the StreamBase Python library to write a client
application that enqueues data to StreamBase Server. A sample file, SimpleEnqueuer.py
, is provided as part of the client
sample included in the StreamBase base kit in $STREAMBASE_HOME
/sample/client
.
The basic procedure for enqueuing data into a StreamBase node from Python is:
-
Import the StreamBase Python module:
import streambase as sb
-
Create an instance of the
streambase.Client
class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:client = sb.client("sb://localhost:10000/")
-
Enqueue tuples and terminate.
Your Python program can exploit the homology between StreamBase tuples and Python
dictionaries, and between StreamBase lists and Python lists. For example, to enqueue
tuples having the schema {list bids {double Price, int
Volume}}
, your Python client code would be:
import streambase as sb client = sb.Client("sb://localhost") client.enqueue("BidsIn", {'bids': [ {'Price': 10.0, 'Volume': 100}, {'Price': 5.0, 'Volume': 50}]}) client.close()
You can also:
-
Enqueue the tuple onto the stream directly, as a Python tuple or dictionary:
client.enqueue("InputStream", (5, "hello")) client.enqueue("InputStream", {'myint':5, 'mystring':"hello"})
-
Manually build a
streambase.Tuple
object and enqueue it:-
Retrieve a
streambase.Schema
object for each stream to which you want to enqueue. For example:schema = client.getSchemaForStream("InputStream")
-
Create a
streambase.Tuple
object of the specified schema. In this example, the schema of the above stream:sbtuple = sb.Tuple(schema)
-
Set values for each of the tuple's fields, specifying them by name or position. For example:
sbtuple['myint'] = 5 sbtuple['mystring'] = "hello" # ...is equivalent to... sbtuple[0] = 5 sbtuple[1] = "hello" # ...is equivalent to... sbtuple.setInt('myint', 5) sbtuple.setString('mystring', "hello")
-
Enqueue the
streambase.Tuple
onto the stream, which you can reference by name. For example:client.enqueue("InputStream", sbtuple)
-
For details on Python Dictionaries, refer to chapter 3 of the Python Language Reference.
-
On a supported Linux machine where the StreamBase client library and Python library are installed, set the
PYTHONPATH
environment variable as described in Initial Setup for Linux. -
Run your StreamBase client in the Python interpreter. For example:
python
mysbclient.py
-
If the Python interpreter fails with a library import error, make sure the StreamBase client library is available to your linker by adding its path to your
LD_LIBRARY_PATH
environment variable. For example, on a 64-bit Linux system using Bash and with StreamBase installed its default location, use the following command:export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/tibco/sfire-sfds/11.1/lib64
This section describes how to use the StreamBase Python library to write a client
application that dequeues data from StreamBase Server. A sample file, SimpleDequeuer.py
, is provided as part of the client
sample included in the StreamBase base kit in $STREAMBASE_HOME
/sample/client
.
Performance Note
Dequeue (producer) clients that are slow may eventually get
disconnected from the EventFlow engine. Use a configuration file of type
com.tibco.ep.streambase.configuration.sbclientapilistener
to
specify the pagePool > maxClientPages
property.
EventFlow engines disconnect clients that try to allocate more memory than the
limit set by this parameter, and is designed to protect engines from a slow or hung
dequeue client. For more information, see the pagePool object in StreamBase Client API
Listener Configuration.
The basic procedure for dequeuing data from a StreamBase node to Python is as follows:
-
Import the StreamBase Python module with a line like the following:
import streambase as sb
-
Create an instance of the
streambase.Client
class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:client = sb.Client("sb://localhost:10000/")
-
Subscribe to each output stream from which you want to dequeue. For example:
client.subscribe("OutputStream")
-
Call the
dequeue()
method on the client, which blocks until a tuple becomes available. This method returns astreambase.DequeueResult
object, which contains a status flag that indicates whether tuples were received.Warning
It is a best practice that you set a timeout on the dequeue and loop waiting for a
GOOD
status flag. Otherwise you will not be able to use Ctrl+C to exit your Python program while it is waiting for tuples to dequeue, and you will have to kill it with some other method. The dequeue timeout can be large: you are simply setting the maximum latency between the user issuing a Ctrl+C and the program terminating.DEFAULT_TIMEOUT = 500 # milliseconds result = sb.DequeueResult() while result.getStatus != sb.DequeueResult.GOOD: result = client.dequeue(DEFAULT_TIMEOUT)
-
Once
dequeue()
has received tuples, you can get from theDequeueResult
astreambase.TupleList
object containing thestreambase.Tuple
s dequeued, which behaves as a Python list. For example:tuples = result.getTuples() print "Got " + str(len(tuples)) + " tuple" + ((len(tuples) == 1) and "s" or "") print "First tuple is: " + str(tuples[0])
-
If the result of
dequeue().getStatus()
isstreambase.DequeueResult.CLOSED
, the server (or your client from another thread) has closed the connection, and your program should respond appropriately. -
Having received tuples, you can access them any of several ways. You can:
-
Loop over the
TupleList
. For example:for t in tuples: print t
-
Access the
Tuple
's fields by name or index. For example:print "myint: " + str(t[0]) + ", mystring: " + t[1] print "myint: " + str(t['myint']) + ", mystring: " + t['mystring']
-
-
If you wish to cancel the blocking
dequeue()
call (for example, if your client is shutting down), call theclose()
method on theClient
object. In fact, before exiting, always call thestreambase.Client.close()
method to flush the client network buffers.
The procedure for running Python dequeue clients is the same as described above for enqueue clients.
Remember to always call streambase.Client.close()
to flush the client network buffers before
exiting.