public abstract class Operator extends Object implements Serializable, LocatedItem
Operator subclasses must have a public default constructor.
An Operator is notified of state changes through callbacks. The StreamBase
runtime 'calls back' an Operator when it changes the runtime state of the
Operator. These callbacks include Operator.resume()
, Operator.resumed()
,
Operator.suspend()
, Operator.suspended()
and . Operator.shutdown()
.
Operator provides "managed threads", which are threads that run concurrently
with the application, but which can synchronize with its overall state
changes. These threads are started, suspended, resumed, and shut down with
the application. Managed threads are registered with an Operator using the
method registerRunnable
. This is particularly useful for input
adapters, which typically have to respond to external events asynchronously
with the application. Managed threads can call sendOutput
at any
time.
If an Operator registers one or more managed threads, and all of its managed
threads exit their run()
methods, then the Operator itself will
shut down.
The StreamBase runtime blocks while it waits for an Operator's managed
threads to respond to a state change. This can be problematic if a managed
thread is blocked on some event. However, the StreamBase runtime can be
configured to interrupt a thread when it needs to change its state. This is
accomplished by setting the flag shouldInterrupt
to true when
registering the thread with registerRunnable
.
It may be that an Operator's managed thread does not respond to a state
change even after it has been interrupted. If the Operator's thread does not
respond to the state within a given time interval then it is considered to be
in failure and it is shut down. This time interval is specified by the server
configuration parameter operator-state-change-timeout-ms
.
For sharing state amongst instances, Operators may use the services provided by
the Operator.SharedObjectManager
accessible via Operator.getRuntimeEnvironment()
Since 7.2.12, if your operator simply passes through Tuples without creating
new Tuple instances, ensure you override Operator.isPassthruOperator()
for
maximum performance.
Special Studio Considerations:
Operator.setPortHints(int, int)
in your constructor to set the
initial number of ports when presented in an EventFlow editorOperator.setDisplayName(String)
in your constructor to set a
user-friendly name for your Operator. This name is used in the Operator Name
field in the Properties view, and as the details text in the Palette view's
Details mode.Operator.setShortDisplayName(String)
in your constructor to set a
short user-friendly name for your Operator. This name is used as the name of
the operator in the Palette view and above the operator in the EventFlow
canvas.Operator.getPortCounts()
if you change port counts based on user
properties changingOperator.getIconResource(com.streambase.sb.operator.Operator.IconKind)
if you
wish to be displayed using your own iconsOperator.typecheck()
, subclasses should consider throwing
Operator.PropertyTypecheckException
instead of TypecheckException
in order to allow Studio to decorate UI widgets that
caused the typecheck error. See Operator.PropertyTypecheckException
and Operator.getLocation(String)
for details.UIHints
and Parameterizable
for additional control over property display in StudioOperator.getSearchKeywords()
to provide user assistance when
users in Studio are filtering the list of available operators and adapters.Operator.supportsArtifacts()
and Operator.ArtifactProperties
Note: Serializations of instances of this class that are created (e.g., by using
ObjectOutputStream
) in one version of StreamBase in general will not be
deserializable in any other version of StreamBase.
Parameterizable
,
Serialized FormModifier and Type | Class and Description |
---|---|
static class |
Operator.ArtifactProperties
For operators supporting Artifacts, this class is used to communicate properties
about them during development in Studio, and is reported to the operator
immediately prior to
Operator.typecheck() along with all other setters. |
static interface |
Operator.ConfigurationAccessor
Provides access to configuration information for use by operators and adapters, available
from the server's configuration file.
|
static class |
Operator.IconKind
An enumeration for the different kinds of icons that StreamBase Studio
may request when displaying Operators and Adapters.
|
static class |
Operator.LogLevel
Included values are, in decreasing order of chattiness:
Operator.LogLevel.ALL , Operator.LogLevel.TRACE , Operator.LogLevel.DEBUG , Operator.LogLevel.INFO ,
Operator.LogLevel.WARN , Operator.LogLevel.ERROR , Operator.LogLevel.OFF . |
static class |
Operator.OperatorStates
The set of runtime states that an Operator can be in.
|
class |
Operator.OperatorThread
The thread that wraps an operator runnable
|
class |
Operator.PropertyTypecheckException
A typecheck exception associated with an Operator property (or parameter) by name.
|
static interface |
Operator.RuntimeEnvironment
This interface is used to gain access to StreamBase Server information and
facilities.
|
static interface |
Operator.SharedObject
A SharedObject is an object that can be shared between Operators within
a Container.
|
static interface |
Operator.SharedObjectManager
The manager for SharedObjects within a container.
|
static class |
Operator.SuspendBehaviorStates
Suspend behavior defines how an Operator handles tuples when it is
suspended; meaning when it is in the
SUSPENDED state. |
Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_STATE_CHANGE_TIMEOUT
Default value for the timeout for Operator state changes, in
milliseconds: 10000
|
static Operator.ConfigurationAccessor |
OP_CONFIG_MGR_NO_CONF
A configuration manager that provides no information
|
Modifier | Constructor and Description |
---|---|
protected |
Operator()
Constructs an operator.
|
Modifier and Type | Method and Description |
---|---|
boolean |
allowsConcurrency()
Override to indicate whether concurrency may be configured in Studio against
this operator.
|
String |
evaluate(String st,
Tuple input)
evaluate the given string expression in the context of the running module
If an error occurs during the evaluation, the message return value
will be a string containing the error message
|
OperatorArtifactManager |
getArtifactManager()
Returns an ArtifactManager instance for managing the operator's artifacts.
|
int[] |
getAsyncInputPorts()
Override to indicate to Studio which ports (0-based) might process data
asynchronously, this provides different rendering on the EventFlow canvas
to assist users in understanding their data flow
|
CaptureTransformStrategy |
getCaptureStrategy()
Returns the capture transform strategy that this Operator will use.
|
String |
getContainerName()
Return the name of this operator's container.
|
Path |
getDataDirectory()
Get data directory.
|
String |
getDisplayDescription()
Provides a description for this type of Operator.
|
String |
getDisplayName()
Return the display name of this Operator.
|
Tuple |
getDynamicVariables()
Retrieves a read-only tuple describing the current value of all dynamic variables in the module
this operator is contained in.
|
Schema |
getDynamicVariablesSchema()
Retrieves a Schema containing information about every dynamic variable available
to the module this operator is contained in.
|
String |
getFullyQualifiedName()
Return the fully qualified name of this operator.
|
URL |
getIconResource(Operator.IconKind iconType)
Clients should override to support custom icons.
|
int |
getInputPortCount()
Return the number of input ports.
|
Schema |
getInputSchema(int port)
Returns the schema of an input port.
|
Location |
getLocation()
Return the Operator-wide location, useful for error reporting not associated
to a particular property.
|
Location |
getLocation(String property)
Return a new location within this Operator, associated with the given property name.
|
Lock |
getLock()
Return the guard for module state.
|
protected org.slf4j.Logger |
getLogger()
Retrieves a
Logger suitable for logging messages and exceptions
for this Operator. |
Serializable |
getManagedState(String key)
Get a value from transactional memory.
|
String |
getName()
Return the name of this Operator.
|
Schema |
getNamedSchema(String name) |
Operator.ConfigurationAccessor |
getOperatorConfigurationAccessor()
Provides access to operator and adapter configuration information from the server's configuration file.
|
int |
getOutputPortCount()
Returns the number of output ports.
|
Schema |
getOutputSchema(int outputPort)
Return the output schema for the given output port (zero-based).
|
Parameterizable |
getParameters()
Returns the Java Bean that holds the user-configurable parameters visible
in the StreamBase Studio Properties View of this operator.
|
PortCounts |
getPortCounts()
An optional method that subclasses can override to dynamically tell
Studio the number of input and output ports.
|
Schema[] |
getProposedInputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects.
|
Schema[] |
getProposedOutputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects.
|
InputStream |
getResourceContents(String name)
Returns an open input stream on the contents of the named resource file.
|
File |
getResourceFile(String name)
Returns a
File pointing to the regular file or directory
if it exists. |
boolean |
getReuseTuple()
Get the state of the input Tuple reuse flag.
|
Operator.RuntimeEnvironment |
getRuntimeEnvironment()
Return the
Operator.RuntimeEnvironment for this StreamBase Server. |
Schema |
getRuntimeInputSchema(int port)
Returns the schema of an input port at runtime.
|
Schema |
getRuntimeOutputSchema(int outputPort)
Return the output schema that should actually use to
Operator.sendOutput(int, Tuple) or Operator.sendOutputAsync(int, Tuple) . |
Schema |
getSchemaForCapture(String captureName,
int depth)
Finds the schema for the given capture name in the context that this operator
is running under.
|
String[] |
getSearchKeywords()
Override to declare strings that assist the user in filtering for this operator
when in Studio.
|
String |
getShortDisplayName()
Return the short display name of this Operator.
|
int |
getStateChangeTimeout() |
StorageMethod |
getStorageMethod()
Get operator's state's storage method.
|
TableAccessor |
getTableAccessor(String name)
Get a TableAccessor for a table in the local module by name of that table.
|
TransactionIsolationLevel |
getTransactionIsolationLevel()
Get current transaction isolation level.
|
TupleCaptureTransformer |
getTupleCaptureTransformer(Schema s)
Get a TupleCaptureTransformer capable of translating tuples with the given schema to the equivalent
schema with all the capture fields expanded out, and translating expanded tuples back into tuples with
the given schema
This method may only be called at runtime; the exact schemas of any capture fields are not fully determined
at typecheck time.
|
Schema |
getTypecheckInputSchema(int port)
Returns the schema of an input port that was set at application typecheck
time.
|
Schema |
getTypecheckOutputSchema(int outputPort)
Return the output schema as set by typecheck.
|
boolean |
hasNotYetStarted()
Returns true if this Operator has not yet started running.
|
void |
init()
After the Operator has typechecked successfully, but before the
application or any managed threads start, the StreamBase server will call
the init method.
|
boolean |
isConcurrentByDefault()
Indicate whether an operator should be in a concurrent region by default
|
boolean |
isDroppingTuples()
Returns true if the Operator will drop any tuples it receives when it is
suspended.
|
boolean |
isPassthruOperator()
Allows an operator to declare itself to be a "pass-thru" operator -- one that calls its
sendOutput method with the unmodified tuples received by its processTuple method.
|
boolean |
isProcessingTuples()
Returns true if the Operator will process any tuples it receives when it
is suspended.
|
boolean |
isRunning()
Returns true if this Operator is currently running, false otherwise.
|
boolean |
isRuntime() |
boolean |
isShutdown()
Returns true if this Operator is currently shut down, false otherwise.
|
boolean |
isSuspended()
Returns true if this Operator is currently suspended, false otherwise.
|
void |
partitionActive(String name)
Called when a partition transitions into Active state.
|
void |
partitionNotActive(String name)
Called when a partition transitions into any other state from Active state.
|
void |
postShutdown()
postShutdown is called by the StreamBase runtime just after shutting down
this Operator.
|
abstract void |
processTuple(int inputPort,
Tuple tuple)
This method will be called by the StreamBase server for each Tuple given
to the Operator to process.
|
void |
registerRunnable(Runnable operatorRunnable)
Deprecated.
As of StreamBase version 3.7, replaced by
Operator.registerRunnable(Runnable, boolean) |
void |
registerRunnable(Runnable operatorRunnable,
boolean shouldInterrupt)
Register a Runnable object to be managed by this Operator.
|
void |
registerRunnable(String runnableName,
Runnable operatorRunnable,
boolean shouldInterrupt)
Register a Runnable object to be managed by this Operator.
|
void |
registerRunnable(String runnableName,
Runnable operatorRunnable,
boolean shouldInterrupt,
boolean synchronizedShutdown)
Register a Runnable object to be managed by this Operator.
|
void |
remoteNodeActive(String remoteNodeName)
Called when a remote node has become active in the cluster.
|
void |
remoteNodeUnavailable(String remoteNodeName)
Called when a remote node has become unavailable.
|
void |
remoteQuorumLost(String availabilityZoneName,
String reason)
Called after deactivating all partitions when loss of quorum is detected.
|
protected void |
requireInputPortCount(int numPorts)
Throws a PortMismatchException if the number of ports is not numPorts.
|
void |
resume()
resume() is called when an operator starts or resumes execution, before
any registered runnables are started or resumed.
|
void |
resumed()
resumed() is called after all registered runnables of the operator have
started or resumed.
|
Cancellable |
scheduleRunnable(Date when,
Runnable runner)
Schedules a Runnable object to be run at some future point as part of
the same parallel region as this Operator.
|
void |
sendErrorOutput(String message)
Send an error tuple from this operator's error port.
|
void |
sendErrorOutput(Throwable t)
Send an exception via the error output port
|
void |
sendErrorOutput(Throwable t,
int port,
Tuple errorTuple)
Send an exception via the error output port
|
void |
sendOutput(int port,
List<Tuple> tuples)
Enqueue a List of Tuples to be sent synchronously to downstream operators.
|
void |
sendOutput(int port,
Tuple tuple)
Enqueue a Tuple to be sent synchronously to downstream operators.
|
void |
sendOutputAsync(int port,
List<Tuple> tuples)
Enqueue a List of Tuples to be sent asynchronously to downstream operators.
|
void |
sendOutputAsync(int port,
Tuple tuple)
Enqueue a Tuple to be sent asynchronously to downstream operators.
|
void |
setCaptureStrategy(CaptureTransformStrategy cts)
Set the capture transform strategy to use for the inputs and outputs
of this Operator.
|
protected void |
setDisplayDescription(String description)
Set a description for this type of Operator.
|
void |
setDisplayName(String dn)
The display name is the String that's shown in the Operator Name field of
the Properties view, and in the detailed text for this operator in the
Palette view's Details mode.
|
void |
setDynamicVariable(String name,
Object value)
Sets the value of this module's dynamic variable.
|
static void |
setLogLevel(org.slf4j.Logger logger,
Operator.LogLevel level)
Sets the log level of the given logger.
|
static void |
setLogLevel(org.slf4j.Logger logger,
String level)
Sets the log level of the given logger.
|
boolean |
setManagedState(String key,
Serializable value)
Store a value in transactional memory
|
Schema |
setOutputSchema(int port,
Schema outputSchema)
Sets the output schema for the given output port (port #'s are zero
based).
|
void |
setParameters(Parameterizable params)
Sets the Java Bean that holds the user-configurable parameters visible in
the StreamBase Studio Properties View of this operator.
|
void |
setPortHints(int numInputPorts,
int numOutputPorts)
Used to tell StreamBase Studio how many ports to draw on the Java
Operator when the operator is drawn in the IDE's canvas.
|
protected void |
setReuseTuple(boolean reuse)
Allow/disallow the runtime to reuse tuples on operator input.
|
void |
setShortDisplayName(String sn)
The short display name is the String displayed by Studio as the name of
this operator in the Palette view, and above the operator in the
EventFlow canvas.
|
void |
setSuspendBehavior(int suspendBehavior)
Set the suspend behavior of this Operator.
|
boolean |
shouldRun()
Return whether or not calling operator thread is enabled and should
continue running.
|
void |
shutdown()
shutdown is called by the StreamBase runtime just prior to shutting down
this Operator.
|
int |
size()
Return the "size" of this operator.
|
boolean |
supportsArtifacts()
Indicates whether or not the operator supports the use of artifacts
The default implementation of this methods returns false.
|
boolean |
supportsTransactionalMemory()
Indicate whether an operator optionally supports transactional memory
|
void |
suspend()
suspend() will be called when an operator suspends, before any registered
runnables are suspended.
|
void |
suspended()
suspended() will be called after all registered runnables of the operator
have suspended.
|
abstract void |
typecheck()
The typecheck method is called by Studio and the StreamBase server to ensure that
all the parameters for this operator are correct.
|
public static final Operator.ConfigurationAccessor OP_CONFIG_MGR_NO_CONF
public static final int DEFAULT_STATE_CHANGE_TIMEOUT
An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.
When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.
If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.
If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.
protected Operator()
All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.
public URL getIconResource(Operator.IconKind iconType)
iconType
- the kind of icon being requested, as an Operator.IconKind
enumeration valueOperator.IconKind
public final String getDisplayName()
public final String getDisplayDescription()
protected final void setDisplayDescription(String description)
description
- description for this type of Operator, never nullOperator.getDisplayDescription()
public final String getShortDisplayName()
By default, this returns the simple class name of this Operator.
public String getName()
public String getContainerName()
public String getFullyQualifiedName()
public final void setDisplayName(String dn)
dn
- Display namepublic final void setShortDisplayName(String sn)
sn
- Short display namepublic void setPortHints(int numInputPorts, int numOutputPorts)
This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.
To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.
To specify the ports dynamically within studio, see getPortCounts.
numInputPorts
- number of input ports. Must be non-negative.numOutputPorts
- number of output ports. Must be non-negative.IllegalArgumentException
- if an argument is less than zeroOperator.requireInputPortCount(int)
,
Operator.setOutputSchema(int, Schema)
public abstract void typecheck() throws Operator.PropertyTypecheckException, TypecheckException
Operator.setOutputSchema(int, Schema)
method if there are any
output ports.
If the parameters are not correct, or the input port Schemas are not
correct, a Operator.PropertyTypecheckException
or TypecheckException
should be thrown.
The former should always be used if the exception is related to a particular parameter.
The method requireInputPortCount() should be used to verify that the required input ports are set.
If the Operator changes the number of input ports, this method must call requireInputPortCount.
Operator.PropertyTypecheckException
- when a parameter value is unexpected.TypecheckException
- when parameters or input Schemas are not satisfied.Operator.requireInputPortCount(int)
,
Operator.setOutputSchema(int, Schema)
,
Operator.PropertyTypecheckException
public void init() throws StreamBaseException
If this operator wishes to register threads, it should call
Operator.registerRunnable(Runnable)
from this method.
StreamBaseException
- Prevents the application from starting.public boolean hasNotYetStarted()
Operator.OperatorStates.INITIALIZED
public boolean isDroppingTuples()
Operator.SuspendBehaviorStates.DROPPING_TUPLES
public boolean isProcessingTuples()
Operator.SuspendBehaviorStates.PROCESSING_TUPLES
public boolean isRunning()
Operator.OperatorStates.STARTED
public boolean isShutdown()
Operator.OperatorStates.SHUTDOWN
public boolean isSuspended()
Operator.OperatorStates.SUSPENDED
public abstract void processTuple(int inputPort, Tuple tuple) throws StreamBaseException
By default the Tuple passed in is safe for storing and mutating after the
return of processTuple. The details of this Tuple's life cycle can be
controlled via Operator.setReuseTuple(boolean)
.
inputPort
- the input port that the tuple is from (ports are zero based)tuple
- the tuple from the given input portStreamBaseException
- Terminates the application.public void resume() throws Exception
Operator.shutdown()
will be called.
resume() is a callback that is called by the StreamBase runtime.
Exceptions thrown during calls to Operator.resume()
will be processed
as error tuples, and trigger a shutdown of the operator.Exception
- Error processing callbackpublic void resumed() throws Exception
Operator.shouldRun()
has unblocked and
returned true in all registered runnables.
resumed() is a callback that is called by the StreamBase runtime.
Exceptions thrown during calls to Operator.resumed()
will be
processed as error tuples, and trigger
a shutdown of the operator.Exception
- Error processing callbackpublic int size()
Override this method to display the size of an important data structure in this Operator. For example the size of a queue. Returns 0 by default.
This method must return promptly and never block, as this is called from a system statistics monitoring thread.
public final boolean shouldRun()
This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.
This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.
An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).
UnsupportedOperationException
- If this was not called from an operator thread.public void shutdown()
public void postShutdown()
public void suspend() throws Exception
Operator.suspend()
will be processed as error tuples.Exception
- Error processing callbackpublic void setSuspendBehavior(int suspendBehavior)
setSuspendBehavior()
might be called in the Operator's constructor or in init()
.suspendBehavior
- The suspend behavior to set, either PROCESSING_TUPLES or
DROPPING_TUPLES.Operator.SuspendBehaviorStates.PROCESSING_TUPLES
,
Operator.SuspendBehaviorStates.DROPPING_TUPLES
public void suspended() throws Exception
Operator.shouldRun()
.
suspended() is a callback that is called by the StreamBase runtime.
Exceptions thrown during calls to Operator.suspended()
will be processed as error tuples.Exception
- Error processing callbackpublic void remoteNodeActive(String remoteNodeName)
remoteNodeName
- the remote node namepublic void remoteNodeUnavailable(String remoteNodeName)
remoteNodeName
- the remote node namepublic void remoteQuorumLost(String availabilityZoneName, String reason)
availabilityZoneName
- name of the availability zone that owns the associated quorumreason
- string containing a reason why loss was detected.public void partitionActive(String name)
Called when a partition transitions into Active state.
In the Active state, partition is running on the active node for the partition.
name
- name of the partition that becomes activepublic void partitionNotActive(String name)
Called when a partition transitions into any other state from Active state.
name
- name of the partition that becomes not Activepublic Parameterizable getParameters()
public void setParameters(Parameterizable params)
params
- the Java Bean that holds the user-configurable parameters
visible in the StreamBase Studio Properties View of this
operator.public InputStream getResourceContents(String name) throws ResourceNotFoundException, StreamBaseException
Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/) The default project resource directories are src/main/resources and src/test/resources (test phase only).
For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.
Call this, or Operator.getResourceFile(String)
, during
typecheck (as opposed to waiting for Operator.init()
or similar runtime methods) in
order to be able to surface to the authoring environment any failures
locating or reading the resource.
name
- resource nameResourceNotFoundException
- resource cannot be foundStreamBaseException
- resource found, but cannot be openedClassLoader.getResource(String)
,
Operator.getResourceFile(String)
,
Operator.getDataDirectory()
public File getResourceFile(String name) throws ResourceNotFoundException, StreamBaseException
File
pointing to the regular file or directory
if it exists. If this returns a regular file, Operator.getResourceContents(String)
is guaranteed not to fail.
Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/). The default project resource directories are src/main/resources and src/test/resources (test phase only).
For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.
Call this, or Operator.getResourceContents(String)
, during
typecheck (as opposed to waiting for Operator.init()
or similar runtime methods) in
order to be able to surface to the authoring environment any failures
locating or reading the resource.
name
- resource nameFile
pointing to the named resource, or null if one cannot be
associated with the named resource (for example, if storage is
not available on a file system for this resource)ResourceNotFoundException
- resource cannot be foundStreamBaseException
- resource found, but cannot be openedClassLoader.getResource(String)
,
Operator.getResourceContents(String)
,
Operator.getDataDirectory()
public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt, boolean synchronizedShutdown) throws StreamBaseException
The body of this Runnable must hook into StreamBase's thread
management by repeatedly calling the Operator.shouldRun()
method during
its entire lifetime. This allows managed threads to be started, stopped,
suspended, and resumed along with the rest of the application.
This method should be called from the operator's Operator.init()
method.
runnableName
- The name of this runnable (may be null). Names are reported on
the statistics stream, and displayed by sbmonitor and
sbmanager.operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator
undergoes a state change. Typically true if the thread blocks
on some event, input for example.synchronizedShutdown
- Whether to wait for this thread to exit before postShutdown is called.StreamBaseException
- If it is too late in the Operator's life cycle to register a
Runnable.public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
The body of this Runnable must hook into StreamBase's thread
management by repeatedly calling the Operator.shouldRun()
method during
its entire lifetime. This allows managed threads to be started, stopped,
suspended, and resumed along with the rest of the application.
This method should be called from the operator's Operator.init()
method.
runnableName
- The name of this runnable (may be null). Names are reported on
the statistics stream, and displayed by sbmonitor and
sbmanager.operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator
undergoes a state change. Typically true if the thread blocks
on some event, input for example.StreamBaseException
- If it is too late in the Operator's life cycle to register a
Runnable.public final void registerRunnable(Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
The body of this Runnable must hook into StreamBase's thread
management by repeatedly calling the Operator.shouldRun()
method during
its entire lifetime. This allows managed threads to be started, stopped,
suspended, and resumed along with the rest of the application.
This method should be called from the operator's Operator.init()
method.
operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator
undergoes a state change. Typically true if the thread blocks
on some event, input for example. -StreamBaseException
- If it is too late in the Operator's life cycle to register a
Runnable.public final void registerRunnable(Runnable operatorRunnable) throws StreamBaseException
Operator.registerRunnable(Runnable, boolean)
The body of this Runnable must hook into StreamBase's thread
management by repeatedly calling the Operator.shouldRun()
method during
its entire lifetime. This allows managed threads to be started, shut
down, suspended, and resumed along with the rest of the application.
This method should be called from the adapter's Operator.init()
method.
operatorRunnable
- The Runnable to register.StreamBaseException
- If it is too late in the Operator's life cycle to register a
Runnable.public Cancellable scheduleRunnable(Date when, Runnable runner)
when
- Time to run the runner. If it is in the past, it is run immediatelyrunner
- The Runnable to scheduleprotected void requireInputPortCount(int numPorts)
This method should be called by the typecheck method.
numPorts
- the number of portsPortMismatchException
- When the number of input ports is incorrect.public PortCounts getPortCounts() throws TypecheckException
typecheck
. This method will not be called if Studio
encountered any exception while calling each setter with the property
values just entered by the user.
Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.
TypecheckException
- Type check exceptionpublic int[] getAsyncInputPorts()
public final int getInputPortCount()
public final int getOutputPortCount()
Operator.setOutputSchema(int, Schema)
method.Operator.setOutputSchema(int, Schema)
public void sendOutput(int port, Tuple tuple) throws StreamBaseException
All calls to Operator.sendOutput(int, Tuple)
are well ordered with
respect to other calls to Operator.sendOutput(int, Tuple)
. Meaning
that downstream logic will receive the tuples in the order they are sent.
Calls to Operator.sendOutputAsync(int, Tuple)
and Operator.sendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the
two methods can be received downstream in any order.
A note about reusing/caching the sent tuple.
If the given tuple is a tuple that this operator received from a processTuple() call, then the tuple can be reused if and only if getReuseTuple() is FALSE.
If the given tuple is created by this operator then it can be reused when the call to sendOutput() returns. StreamBase is done with the tuple once control returns to this operator.
Operator.sendOutput(int, Tuple)
uses this call's transaction, if one is
active, otherwise a new transaction is started to send the Tuple to downstream
operators.
port
- The output port the Tuple is enqueued upon (ports are zero
based)tuple
- The Tuple to enqueueStreamBaseException
- if the port is invalid or the tuple argument doesn't match
the schema of the output port.public void sendOutput(int port, List<Tuple> tuples) throws StreamBaseException
All calls to Operator.sendOutput(int, Tuple)
are well ordered with
respect to other calls to Operator.sendOutput(int, Tuple)
. Meaning
that downstream logic will receive the tuples in the order they are sent.
Calls to Operator.sendOutputAsync(int, Tuple)
and Operator.sendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the
two methods can be received downstream in any order.
A note about reusing/caching the sent tuples.
If the tuples in the given list are tuples that this operator received from processTuple() calls, then these tuples can be reused if and only if getReuseTuple() is FALSE.
If the given list of tuples is created by this operator, then the tuples in this list can be reused when the call to sendOutput() returns. StreamBase is done with the tuples once control returns to this operator.
Operator.sendOutput(int, List)
uses this call's transaction, if one is
active, otherwise a new transaction is started to send the Tuple to downstream
operators.
port
- The output port the Tuple is enqueued upon (ports are zero
based)tuples
- The List of Tuple objects to enqueueStreamBaseException
- if the port is invalid or any tuple in the tuples argument
doesn't match the schema of the output port.public void sendOutputAsync(int port, Tuple tuple) throws StreamBaseException
All calls to Operator.sendOutputAsync(int, Tuple)
are well ordered with
respect to other calls to Operator.sendOutputAsync(int, Tuple)
. Meaning
that downstream logic will receive the tuples in the order they are sent.
Calls to Operator.sendOutputAsync(int, Tuple)
and Operator.sendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the
two methods can be received downstream in any order.
Note: The Tuple will be queued into the application some time in the future.
The asynchronous nature of this call could cause subtle race conditions in applications.
The performance differential (if any) between sendOutputAsync
and sendOutput
is highly dependent upon the application and hardware. Benchmarking
is the only way to ensure the fastest possible implementation.
The tuple can be reused once sendOutputAsync returns.
A new transaction is always started when the Tuple is read from the asynchronous queue for processing.
port
- The output port the Tuple is enqueued upon (ports are zero
based)tuple
- A tuple to enqueue asynchronouslyStreamBaseException
- if the port is invalid or the tuple argument doesn't match
the schema of the output port.public void sendOutputAsync(int port, List<Tuple> tuples) throws StreamBaseException
All calls to Operator.sendOutputAsync(int, Tuple)
are well ordered with
respect to other calls to Operator.sendOutputAsync(int, Tuple)
. Meaning
that downstream logic will receive the tuples in the order they are sent.
Calls to Operator.sendOutputAsync(int, Tuple)
and Operator.sendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the
two methods can be received downstream in any order.
Note: The Tuples will be queued into the application some time in the future.
The asynchronous nature of this call could cause subtle race conditions in applications.
The performance differential (if any) between sendOutputAsync
and sendOutput
is highly dependent upon the application and hardware. Benchmarking
is the only way to ensure the fastest possible implementation.
The tuples (and the List<Tuple>) can be reused once sendOutputAsync returns.
A new transaction is always started when the Tuple is read from the asynchronous queue for processing.
port
- The output port the Tuple is enqueued upon (ports are zero
based)tuples
- A List<Tuple> to enqueue asynchronouslyStreamBaseException
- if the port is invalid or any tuple in the tuples argument
doesn't match the schema of the output port.public final Schema getInputSchema(int port)
Operator.getTypecheckInputSchema(int)
. After typecheck time, behaves like
Operator.getRuntimeInputSchema(int)
.port
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic final Schema getRuntimeInputSchema(int port)
Operator.setCaptureStrategy(CaptureTransformStrategy)
. This will be the actual schema of tuples passed to
Operator.processTuple(int, Tuple)
port
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic final Schema getTypecheckInputSchema(int port)
Operator.processTuple(int, Tuple)
method. For the actual schema of
those tuples, see Operator.getRuntimeInputSchema(int)
.port
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic final Schema getOutputSchema(int outputPort)
Operator.getTypecheckOutputSchema(int)
. Otherwise, if called from Operator.init()
or later, it will act
as Operator.getRuntimeOutputSchema(int)
outputPort
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic String evaluate(String st, Tuple input) throws StreamBaseException
st
- the expression to be evaluatedinput
- the input tupleStreamBaseException
- Could not perform evaluationpublic void setDynamicVariable(String name, Object value) throws StreamBaseException
name
- name of the dynamic variable to be setvalue
- the new value (may be null)StreamBaseException
- if an error occurspublic Schema getNamedSchema(String name)
name
- of the named schemapublic Schema getDynamicVariablesSchema()
public Tuple getDynamicVariables() throws TupleException
TupleException
- Error creating return tuplepublic final boolean setManagedState(String key, Serializable value)
This method can only be called when an operator is configured to use transactional memory for its state. There must be an active transaction when this method is called.
key
- Key value. Must be cluster unique if state is replicated.value
- Value to storetrue
if successful, false otherwiseUnsupportedOperationException
- Operator state not in transactional memory or no active
transactionpublic final Serializable getManagedState(String key)
There must be an active transaction when this method is called
key
- Key valueUnsupportedOperationException
- Operator state not in transactional memory or no active
transactionpublic final StorageMethod getStorageMethod()
May return null if information is not available, e.g. during typecheck.
public final TransactionIsolationLevel getTransactionIsolationLevel()
May return null if information is not available, e.g. during typecheck.
public boolean isConcurrentByDefault()
The default value is false.
The value of this method is used at design-time to indicate whether an operator should be put into a concurrent region when added to an EventFlow.
public boolean supportsTransactionalMemory()
Override this method to return true to indicate that an operator can optionally use transactional memory as storage. The default value is false.
Using Operator.setManagedState(java.lang.String, java.io.Serializable)
and Operator.getManagedState(java.lang.String)
are valid only if this
method returns true and
Operator.getStorageMethod()
returns transactional memory
public final Path getDataDirectory() throws IOException
name = "data-area-configuration"
version = "1.0.0"
type = "com.tibco.ep.streambase.configuration.sbengine"
configuration =
{
StreamBaseEngine =
{
streamBase =
{
dataAreaPath = "v1-data-area"
}
}
}
Any files created in this directory are the responsibility
of the operator that created them. Any clean up must be
explicitly done by the operator.
The data directory is global to an executing fragment, so
all operators running in a fragment can write files into
this directory. It is recommended that sub-directories be used
to isolate files.
WARNING: It is illegal to call this method during typecheckIOException
- Could not create the data directorypublic final Schema getRuntimeOutputSchema(int outputPort)
Operator.sendOutput(int, Tuple)
or Operator.sendOutputAsync(int, Tuple)
.
Note that this schema will sometimes be different than the output schema set at typecheck
time by the transformation determined by Operator.setCaptureStrategy(CaptureTransformStrategy)
. It will never contain
capture fields.
This method may not be called until after typecheck is finished; to do so will cause an IllegalStateExceptionoutputPort
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic final Schema getTypecheckOutputSchema(int outputPort)
Operator.sendOutput(int, Tuple)
at runtime.outputPort
- the port to return the schema for (ports are zero based)IndexOutOfBoundsException
- if port not in rangepublic final CaptureTransformStrategy getCaptureStrategy()
CaptureTransformStrategy
to usepublic final void setCaptureStrategy(CaptureTransformStrategy cts)
CaptureTransformStrategy.FLATTEN
and CaptureTransformStrategy.NEST
. The default is CaptureTransformStrategy.FLATTEN
.cts
- the CaptureTransformStrategy
to use.public final Schema setOutputSchema(int port, Schema outputSchema) throws TypecheckException
Note that the actual Schema object to create tuples at runtime (for use by Operator.sendOutput(int, Tuple)
or Operator.sendOutputAsync(int, Tuple)
)
should be retrieved using Operator.getRuntimeOutputSchema(int)
, as that will take into account any capture
field transformations determined by Operator.getCaptureStrategy()
.
port
- the port to set the given schema to (ports are zero based)outputSchema
- the schema to set the given port toTypecheckException
- Typecheck errorSchema
, 7.2 The returned schema should not be the Schema used to create output tuples. For the
Schema to use to create output tuples, call Operator.getRuntimeOutputSchema(int)
in
the Operator.init()
method or later.Operator.getOutputPortCount()
public void sendErrorOutput(String message)
This method will also record the message in the debug log.
message
- A description of the error, must not be nullIllegalArgumentException
- if message is nullpublic void sendErrorOutput(Throwable t)
This method will also record the message in the debug log.
t
- the exception to reportIllegalArgumentException
- if exception is nullpublic void sendErrorOutput(Throwable t, int port, Tuple errorTuple)
This method will also record the message in the debug log.
t
- the exception to reportport
- the input/output port that the error is related toerrorTuple
- a tuple that caused or is related to the errorIllegalArgumentException
- if exception is nullpublic TableAccessor getTableAccessor(String name) throws StreamBaseException
Warning: This interface is provisional, and will likely change in upcoming versions of StreamBase.
name
- the table nameStreamBaseException
- Error accessing TableAccessorprotected org.slf4j.Logger getLogger()
Logger
suitable for logging messages and exceptions
for this Operator. Equivalent to LoggerFactory.getLogger(Class)
with arguments this.getClass()
.Logger
instance for this Operatorprotected void setReuseTuple(boolean reuse)
Note: that if you plan to store the input Tuple as state in your operator you must copy the Tuple before you store it, if you have called setReuseTuple(true).
reuse
- allow/disallow tuple reusepublic boolean getReuseTuple()
public int getStateChangeTimeout()
Operator.DEFAULT_STATE_CHANGE_TIMEOUT
public Location getLocation()
Operator.getLocation(String)
insteadgetLocation
in interface LocatedItem
public Location getLocation(String property)
Operator.getLocation()
for StreamSQL-sourced Operators. Using this
as an argument to TypecheckException
constructors will cause Studio to display
an error indicated alongside the UI widget corresponding to the passed in property.property
- expected to be the bean name of an Operator propertypublic static void setLogLevel(org.slf4j.Logger logger, String level) throws StreamBaseException
logger
- the Logger
object on which to set the level.level
- the level at which to set the Logger instance.StreamBaseException
- the log level could not be set, most likely because
the current logging implementation is not Logback.public static void setLogLevel(org.slf4j.Logger logger, Operator.LogLevel level) throws StreamBaseException
logger
- the Logger
object on which to set the level.level
- the Operator.LogLevel
at which to set the Logger instance.StreamBaseException
- the log level could not be set, most likely because
the current logging implementation is not Logback.public Operator.RuntimeEnvironment getRuntimeEnvironment()
Operator.RuntimeEnvironment
for this StreamBase Server.
NOTE: Not valid during typecheckpublic boolean isRuntime()
public Operator.ConfigurationAccessor getOperatorConfigurationAccessor()
Operator.ConfigurationAccessor
to provide access to the aforementionedpublic Schema getSchemaForCapture(String captureName, int depth)
(f int, c @A)
, then you might call
getSchemaForCapture("A", 0)
to get (g int, c @B)
as the schema bound to A. Then you might call
getSchemaForCapture("B", 1)
to get (h int)
as the schema bound to B.
Note that the "nesting depth" here has nothing to do with capture fields found in lists or tuples -- the only containment that affects the nesting depth is a capture within another capture.
captureName
- the name of the bound capture typedepth
- the depth of nesting at which to look up the schema of this capture binding.public TupleCaptureTransformer getTupleCaptureTransformer(Schema s) throws StreamBaseException
s
- the schema with capture fieldsStreamBaseException
- if this method is called at the wrong time, or
if the transformations requested are not possible. This may occur, for example,
if there is a name conflict between a captured field and a field already in the schema.public boolean isPassthruOperator()
public String[] getSearchKeywords()
You may return composite words (such as "Thomson Reuters") as a single string; it has no effect on the user's filtering abilities.
public boolean allowsConcurrency()
true
.
This has no effect on the actual runtime environment for operator instances.
public Schema[] getProposedInputSchemas(String mainName)
mainName
- the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemaspublic Schema[] getProposedOutputSchemas(String mainName)
mainName
- the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemaspublic boolean supportsArtifacts()
public OperatorArtifactManager getArtifactManager()
public Lock getLock()
Copyright © 2015–2019 Cloud Software Group, Inc.. All rights reserved.