Contents
This sample demonstrates how to configure and call custom Java aggregate functions in an EventFlow module.
This sample's file's are:
- nth.sbapp
-
The sample EventFlow module file, placed in the package
src/main/eventflow/com.example.sample
. - sbengine.conf
-
The
sbengine
type configuration file, placed insrc/main/configurations
, defines two aliases for the same aggregate function we want to call. Custom aggregate functions in StreamBase always extend the StreamBase Client API'sAggregateWindow
class, and always call theaccumulate
method. Wheytype=aggregate
, the methodName property is optional; if used, it must bemethodName="accumulate"
. - Nth.java
-
Java files containing our example custom classes, placed in
src/main/java/com.streambase.sample
. Notice that the package name for these Java files is the not the same here as for the EventFlow module; they are not required to be the same or related. - TestCase.java
-
An EventFlow JUnit test file, configured and ready to run, placed in
src/test/java/com.tibco.ep.sample
. Run this test by selecting this file, right-clicking, and from the context menu, selecting > . - pom.xml
-
The Maven object model file for this project, placed at the root of the project folder.
The nth.sbapp
EventFlow module has one input stream that divides into two independent Aggregate operators.
- Aggregate Operator PlainAgg
-
This case illustrates a simple, standard use of the Aggregate operator,
PlainAgg
. This operator specifies a single aggregate function,sum()
, from the StreamBase expression language. The operator is configured with a single dimension of typeTuple
with a window size of 3. The result of this design is a single tuple emitted for every three double values sent into IncomingDbl. The output tuple contains the sum of the three input tuples.The Aggregate operator in the top stream specifies a single aggregate function:
sum(inputval)
- Aggregate Operator CalcNth
-
This Aggregate operator calls the the same
sum(inputval)
standard function, and then goes on to call a custom aggregate function classcom.streambase.sample.Nth
in three ways. TheNth
class extends the StreamBase-providedAggregateWindows
class, as described in the Javadoc for that class.The
Nth
class performs a very simple task: it emits the current input value. Thus, the difference between the top and bottom output streams is that the top one emits only the sum of tuples sent in, while the bottom stream does that and goes on to emit each value input into the aggregate window as well.The bottom Aggregate operator specifies the following five aggregate functions:
sum(inputval)
Uses the StreamBase expression language sum()
function.firstval(inputval)
Uses the expression language's firstval()
function to return the first value in the aggregate window.enth(2, intval)
This project's sbengine.conf
configuration defines an alias,enth
, for this project's custom aggregate class. This row calls the first alias instead of usingcalljava()
.oneline(3, intval)
This project's sbengine.conf
configuration defines a second alias,oneline
, for the same aggregate class. This row calls the second alias.calljava('com.streambase.sample.Nth', 4, inputval)
Calls this project's custom aggregate class, but asking for an input value, 4, which is outside of the aggregate window size 3. This condition is expected to return null; it does so only because its second argument is 4, not because we used calljava()
.
To keep the example simple, the custom function does not validate its input at runtime.
In StreamBase Studio, import this sample with the following steps:
-
From the top-level menu, click
> . -
Enter
custom agg
to narrow the list of options. -
Select the Custom Java Aggregate Function sample from the Extending StreamBase category.
-
Click
.
StreamBase Studio creates a project for the sample.
-
In the Project Explorer view, open this sample's folder.
Keep an eye on the bottom right status bar of the Studio window. Make sure any
Updating
,Downloading
,Building
, orRebuild project
messages finish before you proceed. -
Open the
src/main/eventflow/
folder.packageName
-
Double-click to open the
nth.sbapp
application. Make sure the application is the currently active tab in the EventFlow Editor. -
Click the Run button. This opens the SB Test/Debug perspective and starts the module.
-
Wait for the Waiting for fragment to initialize message to clear.
-
Enter data for the top stream:
-
In the Manual Input view, select the
IncomingDbl
input stream. -
In the
inputval
field, enter10
and press three times. -
Observe the tuples in the Output Streams view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe on the
OutPlain
streams is:SumEveryThree=30.0
On the
OutWithCustom
stream, expect an output tuple like the following:SumEveryThree=30.0, first=10.0, second=10.0, third=10.0, bogus=null
-
-
Send in more input values, which can be the same three times, or three different numbers, pressing
after each one. -
When done, press F9 or click the Terminate EventFlow Fragment button.
When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.
Important
Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.
Using the workspace copy of the sample avoids permission problems. The default workspace location for this sample is:
studio-workspace
/sample_custom-java-aggregate
See Default Installation Directories for the default location of studio-workspace
on your system.