Contents
The Round Robin Dequeuer sample is a set of advanced StreamBase modules that presumes you are familiar with:
-
StreamSQL programming
-
StreamBase containers and the
system
container -
Adding applications to containers in a running server
-
Using output predicates on output streams
-
Editing StreamBase Server configuration files
The application modules in this sample demonstrate the use of round robin dequeuing. Round robin dequeuer applications control the destination of output tuples based on the connected clients, so that each output message is delivered to a single, subscribed client.
Note
Round robin dequeuing may lose messages as client programs connect and disconnect.
You can configure a StreamBase application to work as a round robin dequeuer by adding the following features:
- Subscriptions table
-
A subscriptions table receives an event whenever a client subscribes or unsubscribes from a stream. The table keeps track of the currently connected dequeuers, so that we can round-robin them. The subscriptions table is maintained in the system container. Its schema must contain at least the following fields:
-
connectionid
(int), the client connection identifier. Each connected client has a connection ID, which is a random hash value. To see connection IDs at run time, enter the command sbadmin listConnections. -
path
(string), the path of the stream. -
logicalPath
(string), the logical network address of the connected client. This is the same as the path, unless filtered subscribe is used (logical stream names are described in Narrowing Dequeue Results with Filtered Subscribe in the API Guide).
-
- Output processing module
-
Your dequeuer application must include an output processing module that tracks the current subscription set based on the Subscriptions table.
- Output stream destination control
-
Connect the processing module to an output stream in your application. In the output stream, set a predicate to control which clients will receive its tuples. The predicate can reference per-client fields, such as
connectionid
. A tuple is delivered only to clients for which the predicate is true. If the identifier is null, then the tuple is delivered to all connected clients, as usual. - Server configuration
-
Edit the server configuration file to identify the location of the processing module. For example, in this sample, the processing module is in the same location as the server configuration file:
<global> <module-search directory="."/> ... </global>
With these features in place, you can write your own round robin logic to determine where to send output messages.
This sample includes several applications and supporting files:
-
roundrobin.sbapp
: EventFlow version of the roundrobin example. -
roundrobin.ssql
: StreamSQL version of the roundrobin example. -
roundrobinwide.sbapp
: EventFlow version of the roundrobin example with many fields. -
roundrobinwide.ssql
: StreamSQL version of the roundrobin example with many fields. -
systemtables.sbapp
: EventFlow version of the output processing module used by the roundrobin modules to maintain a subscriptions table. -
systemtables.ssql
: StreamSQL version of the output processing module.
This section uses the StreamSQL version of the roundrobin
sample to describe how the application works. The EventFlow versions follow the same logic. The purpose of the module is
to process subscribe and unsubscribe events to maintain its table of current subscriptions.
The following discussion examines the output processing module, which is defined in systemtables.ssql
. Bracketed numbers on the right are not part of the code, but instead link to lines of commentary below.
create input stream subscriptions(connectionid string, path string, [1] logicalPath string, subscribe boolean, time timestamp); create output memory table subscriptionstab (id long, [2] connectionid string, path string, logicalPath string, time timestamp, primary key(connectionid, logicalPath) using btree); create index subidindex on subscriptionstab using btree (id); [3] create memory table serialid(id int, serialnum long, primary key(id) using hash); [4] create stream subscriptionsinsert as select * from subscriptions [5] where subscribe == true; create stream subscriptionswithid; [6] insert into serialid (id, serialnum) select 1 as id, [7] long(1) as serialnum from subscriptionsinsert on duplicate key update serialnum = serialid.serialnum+1 returning subscriptionsinsert.*, serialid.serialnum as subscriptionid INTO subscriptionswithid; insert into subscriptionstab select subscriptionid, [8] connectionid, path, logicalPath, time from subscriptionswithid; create stream subscriptionsdelete as select * from subscriptions [9] where subscribe == false; delete from subscriptionstab using subscriptionsdelete [10] where subscriptionstab.connectionid = subscriptionsdelete.connectionid and subscriptionstab.logicalPath = subscriptionsdelete.logicalPath;
The following annotations describe points of interest in the systemtables
module:
-
Create input stream for subscriptions. The stream schema includes these fields:
-
The required
connectionid
,path
, andlogicalPath
fields. -
subscribe
: A flag indicating either a subscribe or an unsubscribe event. -
time
: The timestamp of the event.
-
-
Create the shared
subscriptions
table. The table schema adds anid
field for the subscription sequence number. -
Create table for the serial number for the subscriptions table.
-
Create an intermediate stream so that we don't access the query table on each insert.
-
Insert the next sequence if no rows, otherwise do an update returning the serial numbers and subscriptions.
-
Create an intermediate stream to handle deleted subscriptions.
This section examines the StreamSQL version of dequeuer application itself, roundrobin.ssql
, which contains the systemtables
module.
create input stream subscriptions(connectionid string, path string, [1] logicalPath string, subscribe boolean, time timestamp); create table subscriptionstab; [2] APPLY MODULE "systemtables.ssql" FROM subscriptions=subscriptions [3] INTO subscriptionstab=subscriptionstab; create input stream in(a int); [4] create output stream out(a int, nextsubid long, nextconnectionid string) [5] with output predicate system.connectionid = nextconnectionid; declare lastsubid long default 0 [6] update from (select nextsubid from out); create output stream nextid AS [7] select in.*, subscriptionstab.id as nextsubid, subscriptionstab.connectionid as nextconnectionid from in outer join subscriptionstab where subscriptionstab.id > lastsubid and path = getPath('out') order by id limit 1; select nextid.a, [8] coalesce(nextid.nextsubid, subscriptionstab.id) as nextsubid, coalesce(nextid.nextconnectionid, subscriptionstab.connectionid) as nextconnectionid from nextid, subscriptionstab where path = getPath('out') order by id limit 1 into out;
The following annotations describe points of interest in the roundrobin.ssql
application:
-
Note that this schema matches that of the output processing module's input stream.
-
This is a placeholder table for the shared table in the
systemtables
module. -
The input stream for the actual data, in this example a simple int field.
-
Create the output stream now so that we can use it to define a dynamic variable later. Output fields include:
-
a
: The application data -
nextsubid
: The next subscriber client that we want to send output to. -
nextconnectionid
: The subscriber's connection ID.
Finally, declare a predicate.
connectionid
is one of several special fields, described in Defining Output Streams in the Authoring Guide, that can be used in output stream predicates to control dequeuing. It specifies that output is to be sent only to a particular connection. -
-
Declare the
lastsubid
dynamic variable, which will contain the ID of the last client that received a subscription event from the output stream. -
Create an intermediate output stream that contains the result of a lookup for the
nextid
in the subscriptions table. That is, the next subscription whose ID is greater than the last subscription we used. The lookup returns null if there are no subscriptions, or if we have sent a tuple to the last subscription in the table.The
getPath
function returns the full path of theout
stream, ensuring that the right container is used at run time, as described below. -
If the preceding lookup returns null, then look for the ID from the start of the table. As in standard SQL, the
coalesce
function here returns the first non-null value in the list of values.
This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.
-
Open four terminal windows on UNIX, or four StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.
-
In window 1, start StreamBase Server without an application, but specify the configuration file that is installed with the sample. For example, use this command:
sbd -f sbd.sbconf
-
In window 2, add the primary module to a named container, making sure to connect the
system.subscriptions
stream with the application'ssubscriptions
stream:sbadmin addContainer
rr
roundrobin.sbapprr
.subscriptions=system.subscriptionswhere
rr
is a name you choose for the new container. (At this point, the server may return a warning message in window 1, which can be safely ignored for purposes of this sample.) -
In both windows 2 and 3, start dequeuers against the output stream with the same command in both windows:
sbc -u sb://localhost/
rr
dequeuerr
.outYou can add more windows to see more dequeuers.
-
In window 4, start a feed simulation:
sbfeedsim -u sb://localhost/
rr
Observe data emitted by each of the dequeuers in windows 2 and 3, printing in round robin fashion with one dequeued line going to window 2, the next to window 3, the next to window 2, and so on.
-
To shut down the sbfeedsim operation in window 4, press Ctrl+C.
-
To terminate the server and dequeuers, enter this command in window 4:
sbadmin shutdown
In StreamBase Studio, import this sample with the following steps:
-
From the top menu, select
→ . -
Select Round robin dequeuing from the Applications category.
-
Click OK.
StreamBase Studio creates a single project containing this sample's files.
When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.
Important
Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.
Using the workspace copy of the sample avoids the permission problems that can occur when trying to work with the initially installed location of the sample. The default workspace location for this sample is:
studio-workspace
/sample_roundrobin
See Default Installation Directories for the location of studio-workspace
on your system.
In the default TIBCO StreamBase installation, this sample's files are initially installed in:
streambase-install-dir
/sample/roundrobin
See Default Installation Directories for the default location of studio-workspace
on your system.