You can use the StreamBase Client wizard to create a Java client enqueuer or dequeuer program that will interact with a StreamBase application. The generated code has commented sections that you can later edit to add the functionality you want. When you want to deploy the client to use with your StreamBase applications, you can package your code into a JAR file and export it to your file system.
To open the wizard, click
> > . Edit these fields in the wizard:- Type
-
Indicate the type of client you want to generate: Choose one of the following:
- Dequeuer
-
A program that dequeues data from StreamBase Server, sometimes called a consumer client. You might use dequeue clients as part of your testing procedure, or as an integral part of your deployed application.
- Enqueuer
-
A program that feeds data into StreamBase Server, and sometimes called a producer client. An enqueuer might be part of your testing procedure (to simulate streaming data in a way that sbfeedsim cannot) or part of the implementation of your deployed application.
- Source folder
-
The path to the
src/main/java
subfolder of an existing project folder in your workspace. - Package
-
While you could use the default package, it is recommended that you specify a different one, such as com.example.mysbapps.clients.
- Name
-
A name for your class, such as
MyEnqueueClient
.
For example:
When you are ready, click
to display the next page (either the enqueuer or dequeuer client page).Use this dialog to enter options for your new dequeue client. A dequeue client gets results from one or more output streams in a StreamBase application.
-
Choose one of the stream options:
- All Streams
-
The generated code will retrieve tuples from all of the output streams of the running StreamBase application.
- Specified Streams
-
Allows you to specify which of the application's output streams to dequeue data from. For each stream:
-
Enter the stream name in the Add a Stream field, and click .
-
Optionally enter a predicate expression that restricts the tuples dequeued to those matching the expression. For example,
PRICE > 50
dequeues only prices greater than 50.
-
-
Specify the host name of the computer running the StreamBase application, and the TCP/IP port on which to access the output streams.
-
Optionally select the Use a background thread option. (For guidance, see the Note on this subject in the next section.)
-
Click
.
The wizard generates the Java file, and opens it in StreamBase Studio. All the required package and import statements are provided and the StreamBase Java Client library class and method definitions are started (based on your selections in the dialogs). Just edit the // TODO comments to complete the coding.
Use the New StreamBase Client Enqueuer page to enter options for your new enqueuer client. An enqueuer client submits data to one or more input streams in a StreamBase application.
-
Choose one of the following options:
- All Streams
-
The generated code will retrieve a listing of all available input streams from the StreamBaseClient proxy instance once it has connected to the StreamBase application.
- Specified Streams
-
Allows you to specify which of the application's input streams the client will submit data to. Enter a stream name in the Add a Stream field, and click .
-
Specify the host name of the computer running the StreamBase application, and the TCP/IP port on which the application will receive input messages.
-
Optionally choose any of these options:
- Use a background thread
-
Note
Use threading with caution. You can create and use separate
StreamBaseClient
objects in separate threads, provided that no two threads use the sameStreamBaseClient
concurrently. Refer to the ClassStreamBaseClient
in the Java Client library reference documentation for further information.
- Enable buffering
-
For details, see Buffering Options.
- Enable batching
-
Specify a batch size in number of tuples to batch and send at once. The default is 10.
-
Click
.
The wizard generates the Java file, and opens it in StreamBase Studio. All the required package and import statements are provided and the StreamBase Java Client library class and method definitions are started (based on your selections in the dialogs). Just edit the // TODO comments to complete the coding.
For enqueue clients, the dialog presents options to enable:
-
The buffer size (number of tuples per buffer)
-
The buffer's flush interval
In the generated code, you can also explicitly flush the buffer of a specified stream, or flush all buffers.
By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.
The example below demonstrates how to use this feature. The buffer size is set to 100 tuples and the buffer flush interval is set to 1000 milliseconds (one second).
package com.mycompany.mysbapps.clients; import java.util.*; import com.streambase.sb.*; import com.streambase.sb.client.*; . . . public class MyEnqueuerClient { private final static String SB_URI = "sb://localhost:10000"; private static StreamBaseClient client = null; private String[] streamNames = null; private Collection tuples = null; private int buf_size = 100; private int flush_interval = 1000; . . . public MyEnqueuerClient(StreamBaseURI uri) throws StreamBaseException { System.out.println("Connecting to " + uri.toString() + "..."); client = new StreamBaseClient(uri); System.out.println("Connected to " + uri.toString()); // Turn on buffering. A WakeAndFlushBuffer thread is only // started if flush_interval > 0. client.enableBuffering(buf_size, flush_interval); }
In the StreamBase Client libraries, buffering is only turned on if the value for the buf_size
parameter is greater than zero. (The buf_size
parameter specifies the number of tuples, not a byte limit.) Also the buffer is flushed at a regular interval specified by
the flush_interval
parameter. If the flush_interval
is not greater than zero, the buffer will only be flushed when it reaches capacity (it is filled with tuples).
If buffering is enabled and the flush_interval
is set, the buffer will be flushed periodically based on the specified interval. However, to have more control over enqueuing
buffered tuples, you can also use the flushBuffer(
method of the stream_name
)StreamBaseClient
class to enqueue the contents of a buffer immediately. Or, you can use the flushAllBuffers()
method to enqueue the contents of all buffers. These methods assume that the caller has a lock on _buffers_lock
.
Generally you want to flush a buffer when you are concerned that the tuples in the buffer may become stale (staleness is relative
to the particular application). Let's say, for example, your buffer size is 1000 and your flush_interval
is 0 (this value turns off periodic flushing). The buffer will automatically be flushed when it is filled. That is, when
the 1000th tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear when or if more tuples will arrive
to fill the buffer, it would be a good idea to call flushBuffer()
or flushAllBuffers()
.
-
For general information about using the StreamBase API for Java clients, start in API Guide.