Contents
This sample demonstrates how to configure and call custom Java aggregate functions in an EventFlow module.
This sample's file's are:
- nth.sbapp
-
The sample EventFlow module file, placed in the package
src/main/eventflow/com.example.sample. - sbengine.conf
-
The
sbenginetype configuration file, placed insrc/main/configurations, that defines an alias for the aggregate function we want to call. Custom aggregate functions in StreamBase always extend the StreamBase Client API'sAggregateWindowclass. To define an alias for such a function, the configuration file must alias that class'saccumulatemethod. This file illustrates the use of the methodName property to allow more than one alias to the same method. - Nth.java
-
Java files containing our example custom classes, placed in
src/main/java/com.streambase.sample. Notice that the package name for these Java files is the not the same here as for the EventFlow module; they are not required to be the same or related. - TestCase.java
-
An EventFlow JUnit test file, configured and ready to run, placed in
src/test/java/com.tibco.ep.sample. Run this test by selecting this file, right-clicking, and from the context menu, selecting >. - pom.xml
-
The Maven object model file for this project, placed at the root of the project folder.
The nth.sbapp EventFlow module contains two independent streams.
- Input stream InPlain
-
The top stream illustrates a simple, standard use of the Aggregate operator,
PlainAgg. This operator specifies a single aggregate function,sum(), from the StreamBase expression language. The operator is configured with a single dimension of typeTuplewith a window size of 3. The result of this design is a single tuple emitted for every three double values sent into InPlain. The output tuple contains the sum of the three input tuples.The Aggregate operator in the top stream specifies a single aggregate function:
sum(inputval) - Input stream InWithCustom
-
The bottom stream is structured the same as the top stream, with a single Aggregate operator. By contrast with the top stream, this operator calls the the same
sum(inputval)standard function, and then goes on to call a custom aggregate function classcom.streambase.sample.Nthin two ways. TheNthclass extends the StreamBase-providedAggregateWindowsclass, as described in the Javadoc for that class.The
Nthclass performs a very simple task: it emits the current input value. Thus, the difference between the top and bottom streams is that the top stream emits only the sum of tuples sent in, while the bottom stream does that and goes on to emit each value input into the aggregate window as well.The Aggregate operator in the bottom stream specifies the following five aggregate functions:
sum(inputval)Uses the StreamBase expression language sum()function.firstval(inputval)Uses the expression language's firstval()function to return the first value in the aggregate window.enth(2, intval)This project's sbengine.confconfiguration defines an alias,enth, for this project's custom aggregate class. This row calls the first alias instead of usingcalljava().online(3, intval)This project's sbengine.confconfiguration defines a second alias,oneline, for the same aggregate class. This row calls the second alias.calljava('com.streambase.sample.Nth', 4, inputval)Calls this project's custom aggregate class, but asking for an input value, 4, which is outside of the aggregate window size 3. This condition is expected to return null.
To keep the example simple, the custom function does not validate its input at runtime.
In StreamBase Studio, import this sample with the following steps:
-
From the top-level menu, click >.
-
Enter
customto narrow the list of options. -
Select the Custom Java Aggregate Function sample from the Extending StreamBase category.
-
Click .
StreamBase Studio creates a project for the sample.
-
In the Project Explorer view, open this sample's folder.
Keep an eye on the bottom right status bar of the Studio window. Make sure any
Updating,Downloading,Building, orRebuild projectmessages finish before you proceed. -
Open the
src/main/eventflow/folder.packageName -
Double-click to open the
nth.sbappapplication. Make sure the application is the currently active tab in the EventFlow Editor. -
Click the
Run button. This opens the SB Test/Debug perspective and starts the module.
-
Wait for the Waiting for fragment to initialize message to clear.
-
Enter data for the top stream:
-
In the Manual Input view, select the
InPlaininput stream. -
In the
inputvalfield, enter10and press three times. -
Observe the tuples in the Output Streams view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe is:
SumEveryThree=30.0 -
Change the
inputvalfield to20and press three more times. -
Look for the emitted value:
SumEveryThree=60.0
-
-
Enter data for the bottom stream:
-
In the Manual Input view, select the
InWithCustominput stream. -
As above, enter
10and press Send Data three times. -
Expect an output tuple like the following:
SumEveryThree=30.0, first=10.0, second=10.0, third=10.0, bogus=null
-
Enter
20and press Send Data three times. -
Look for:
SumEveryThree=60.0, first=20.0, second=20.0, third=20.0, bogus=null
-
-
Experiment further with any set of input values to either stream.
-
When done, press F9 or click the
Terminate EventFlow Fragment button.
When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.
Important
Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.
Using the workspace copy of the sample avoids permission problems. The default workspace location for this sample is:
studio-workspace/sample_custom-java-aggregateSee Default Installation Directories for the default location of studio-workspace on your system.
