Class Operator

  • All Implemented Interfaces:
    LocatedItem, Serializable
    Direct Known Subclasses:
    InputAdapter, OutputAdapter

    public abstract class Operator
    extends Object
    implements Serializable, LocatedItem
    Abstract base class for User code that is used as a Java Operator or an embedded Adapter in a StreamBase application. One instance will be created for each Java Operator in a StreamBase application. StreamBase Studio may operate on several StreamBase applications at a time, so Operator subclass instances may be in different applications.

    Operator subclasses must have a public default constructor.

    An Operator is notified of state changes through callbacks. The StreamBase runtime 'calls back' an Operator when it changes the runtime state of the Operator. These callbacks include resume(), resumed(), suspend(), suspended() and . shutdown().

    Operator provides "managed threads", which are threads that run concurrently with the application, but which can synchronize with its overall state changes. These threads are started, suspended, resumed, and shut down with the application. Managed threads are registered with an Operator using the method registerRunnable. This is particularly useful for input adapters, which typically have to respond to external events asynchronously with the application. Managed threads can call sendOutput at any time.

    If an Operator registers one or more managed threads, and all of its managed threads exit their run() methods, then the Operator itself will shut down.

    The StreamBase runtime blocks while it waits for an Operator's managed threads to respond to a state change. This can be problematic if a managed thread is blocked on some event. However, the StreamBase runtime can be configured to interrupt a thread when it needs to change its state. This is accomplished by setting the flag shouldInterrupt to true when registering the thread with registerRunnable.

    It may be that an Operator's managed thread does not respond to a state change even after it has been interrupted. If the Operator's thread does not respond to the state within a given time interval then it is considered to be in failure and it is shut down. This time interval is specified by the server configuration parameter operator-state-change-timeout-ms.

    For sharing state amongst instances, Operators may use the services provided by the Operator.SharedObjectManager accessible via getRuntimeEnvironment()

    Since 7.2.12, if your operator simply passes through Tuples without creating new Tuple instances, ensure you override isPassthruOperator() for maximum performance.

    Special Studio Considerations:

    Note: Serializations of instances of this class that are created (e.g., by using ObjectOutputStream) in one version of StreamBase in general will not be deserializable in any other version of StreamBase.

    See Also:
    Parameterizable, Serialized Form
    • Field Detail

      • OP_CONFIG_MGR_NO_CONF

        public static final Operator.ConfigurationAccessor OP_CONFIG_MGR_NO_CONF
        A configuration manager that provides no information
      • DEFAULT_STATE_CHANGE_TIMEOUT

        public static final int DEFAULT_STATE_CHANGE_TIMEOUT
        Default value for the timeout for Operator state changes, in milliseconds: 10000

        An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.

        When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.

        If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.

        If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.

        See Also:
        Constant Field Values
    • Constructor Detail

      • Operator

        protected Operator()
        Constructs an operator.

        All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.

        See Also:
        setPortHints(int, int), setParameters(com.streambase.sb.operator.Parameterizable)
    • Method Detail

      • getIconResource

        public URL getIconResource​(Operator.IconKind iconType)
        Clients should override to support custom icons. The default implementation returns null (meaning no icon) for any kind requested.
        Parameters:
        iconType - the kind of icon being requested, as an Operator.IconKind enumeration value
        Returns:
        a URL pointing to an image resource corresponding to the type requested, or null
        See Also:
        Operator.IconKind
      • getDisplayName

        public final String getDisplayName()
        Return the display name of this Operator. The display name is the String that's shown in the list of available Operators.
        Returns:
        The display name of the Operator.
      • getDisplayDescription

        public final String getDisplayDescription()
        Provides a description for this type of Operator. This is used by the user interface to assist the user in selecting an Operator during authoring. The expected usage is to return a three or four line description, without embedded new lines, describing the intent, or technical details that can assist a user in ensuring they are selecting the Operator or Adapter they need.
        Returns:
        The description for this type of Operator, never null
        Since:
        7.6 initial release
      • setDisplayDescription

        protected final void setDisplayDescription​(String description)
        Set a description for this type of Operator. This method is to be called only during the zero-argument constructor for this Operator.
        Parameters:
        description - description for this type of Operator, never null
        Since:
        7.6 initial release
        See Also:
        getDisplayDescription()
      • getShortDisplayName

        public final String getShortDisplayName()
        Return the short display name of this Operator. This short display name is the String displayed by Studio in the palette, and used to name new instances of this Operator to an EventFlow application.

        By default, this returns the simple class name of this Operator.

        Returns:
        The short display name of the Operator.
        Since:
        6.1.2 initial release, 6.5 uses this for new instances added to an EventFlow application
      • getName

        public String getName()
        Return the name of this Operator. Operators are named and can be managed by name. This may return null during typecheck and init phases.
        Returns:
        The name of the operator during runtime, or empty string during typecheck and init
      • getContainerName

        public String getContainerName()
        Return the name of this operator's container. This may return null during typecheck and init phases.
        Returns:
        The name of this operator's container. This may return an empty string during typecheck and init phases.
      • getFullyQualifiedName

        public String getFullyQualifiedName()
        Return the fully qualified name of this operator. This contains the container name, any module names and the name of the operator. This may return null during typecheck and init phases.
        Returns:
        The fully qualified name of this operator. This may return an empty string during typecheck and init phases.
      • setDisplayName

        public final void setDisplayName​(String dn)
        The display name is the String that's shown in the Operator Name field of the Properties view, and in the detailed text for this operator in the Palette view's Details mode. If the subclass doesn't override it, use the class name.
        Parameters:
        dn - Display name
      • setShortDisplayName

        public final void setShortDisplayName​(String sn)
        The short display name is the String displayed by Studio as the name of this operator in the Palette view, and above the operator in the EventFlow canvas. If the subclass doesn't override it, use the class simple name.
        Parameters:
        sn - Short display name
        Since:
        6.1.2 initial release
      • setPortHints

        public void setPortHints​(int numInputPorts,
                                 int numOutputPorts)
        Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas. The default is 1 input port, 0 output ports.

        This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.

        To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.

        To specify the ports dynamically within studio, see getPortCounts.

        Parameters:
        numInputPorts - number of input ports. Must be non-negative.
        numOutputPorts - number of output ports. Must be non-negative.
        Throws:
        IllegalArgumentException - if an argument is less than zero
        See Also:
        requireInputPortCount(int), setOutputSchema(int, Schema)
      • init

        public void init()
                  throws StreamBaseException
        After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method. Operators should override this method to perform custom initialization.

        If this operator wishes to register threads, it should call registerRunnable(Runnable) from this method.

        Throws:
        StreamBaseException - Prevents the application from starting.
      • hasNotYetStarted

        public boolean hasNotYetStarted()
        Returns true if this Operator has not yet started running. hasNotYetStarted() returns true if the Operator is in the runtime state of NOT_YET_STARTED, which is the initial runtime state of an Operator.
        Returns:
        true if not started, false otherwise
        See Also:
        Operator.OperatorStates.INITIALIZED
      • isDroppingTuples

        public boolean isDroppingTuples()
        Returns true if the Operator will drop any tuples it receives when it is suspended.
        Returns:
        true if tuples are dropped when operator suspended, false otherwise
        See Also:
        Operator.SuspendBehaviorStates.DROPPING_TUPLES
      • isProcessingTuples

        public boolean isProcessingTuples()
        Returns true if the Operator will process any tuples it receives when it is suspended.
        Returns:
        true if tuples are processed when suspended, false otherwise
        See Also:
        Operator.SuspendBehaviorStates.PROCESSING_TUPLES
      • isRunning

        public boolean isRunning()
        Returns true if this Operator is currently running, false otherwise. isRunning() returns true if the Operator is in a runtime state of STARTED.
        Returns:
        true if operator running, false otherwise
        See Also:
        Operator.OperatorStates.STARTED
      • isShutdown

        public boolean isShutdown()
        Returns true if this Operator is currently shut down, false otherwise. An Operator is shut down when it is in the runtime state of SHUTDOWN, which is the terminal state for Operators.
        Returns:
        true if operator shutdown, false otherwise
        See Also:
        Operator.OperatorStates.SHUTDOWN
      • isSuspended

        public boolean isSuspended()
        Returns true if this Operator is currently suspended, false otherwise. An Operator is suspended when it is in the runtime state of SUSPENDED.
        Returns:
        true if operator suspended, false otherwise
        See Also:
        Operator.OperatorStates.SUSPENDED
      • processTuple

        public abstract void processTuple​(int inputPort,
                                          Tuple tuple)
                                   throws StreamBaseException
        This method will be called by the StreamBase server for each Tuple given to the Operator to process.

        By default the Tuple passed in is safe for storing and mutating after the return of processTuple. The details of this Tuple's life cycle can be controlled via setReuseTuple(boolean).

        Parameters:
        inputPort - the input port that the tuple is from (ports are zero based)
        tuple - the tuple from the given input port
        Throws:
        StreamBaseException - Terminates the application.
      • resume

        public void resume()
                    throws Exception
        resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed. Note that if the application is shutdown directly from a suspended state, this will not be called; instead shutdown() will be called. resume() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls to resume() will be processed as error tuples, and trigger a shutdown of the operator.
        Throws:
        Exception - Error processing callback
      • resumed

        public void resumed()
                     throws Exception
        resumed() is called after all registered runnables of the operator have started or resumed. That is, once shouldRun() has unblocked and returned true in all registered runnables. resumed() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls to resumed() will be processed as error tuples, and trigger a shutdown of the operator.
        Throws:
        Exception - Error processing callback
      • size

        public int size()
        Return the "size" of this operator. This size value will be displayed in sbmonitor's "size" column.

        Override this method to display the size of an important data structure in this Operator. For example the size of a queue. Returns 0 by default.

        This method must return promptly and never block, as this is called from a system statistics monitoring thread.

        Returns:
        size of this operator.
      • shouldRun

        public final boolean shouldRun()
        Return whether or not calling operator thread is enabled and should continue running.

        This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.

        This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.

        An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).

        Returns:
        true is the operator is running, false otherwise.
        Throws:
        UnsupportedOperationException - If this was not called from an operator thread.
      • shutdown

        public void shutdown()
        shutdown is called by the StreamBase runtime just prior to shutting down this Operator. An implementation of shutdown should include any behavior needed to shut down the Operator - freeing resources, etc. shutdown() is a callback that is called by the StreamBase runtime.
      • postShutdown

        public void postShutdown()
        postShutdown is called by the StreamBase runtime just after shutting down this Operator. postShutdown() is a callback that is called by the StreamBase runtime.
      • suspend

        public void suspend()
                     throws Exception
        suspend() will be called when an operator suspends, before any registered runnables are suspended. suspend() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls to suspend() will be processed as error tuples.
        Throws:
        Exception - Error processing callback
      • setSuspendBehavior

        public void setSuspendBehavior​(int suspendBehavior)
        Set the suspend behavior of this Operator. The Operator can either process or drop tuples when suspended. setSuspendBehavior() might be called in the Operator's constructor or in init().
        Parameters:
        suspendBehavior - The suspend behavior to set, either PROCESSING_TUPLES or DROPPING_TUPLES.
        See Also:
        Operator.SuspendBehaviorStates.PROCESSING_TUPLES, Operator.SuspendBehaviorStates.DROPPING_TUPLES
      • suspended

        public void suspended()
                       throws Exception
        suspended() will be called after all registered runnables of the operator have suspended. That is, once all registered runnables have called and are blocked in shouldRun(). suspended() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls to suspended() will be processed as error tuples.
        Throws:
        Exception - Error processing callback
      • remoteNodeActive

        public void remoteNodeActive​(String remoteNodeName)
        Called when a remote node has become active in the cluster.
        Parameters:
        remoteNodeName - the remote node name
        Since:
        10.0 initial release
      • remoteNodeUnavailable

        public void remoteNodeUnavailable​(String remoteNodeName)
        Called when a remote node has become unavailable.
        Parameters:
        remoteNodeName - the remote node name
        Since:
        10.0 initial release
      • remoteQuorumLost

        public void remoteQuorumLost​(String availabilityZoneName,
                                     String reason)
        Called after deactivating all partitions when loss of quorum is detected.
        Parameters:
        availabilityZoneName - name of the availability zone that owns the associated quorum
        reason - string containing a reason why loss was detected.
        Since:
        10.4
      • partitionActive

        public void partitionActive​(String name)

        Called when a partition transitions into Active state.

        In the Active state, partition is running on the active node for the partition.

        Parameters:
        name - name of the partition that becomes active
        Since:
        10.4
      • partitionNotActive

        public void partitionNotActive​(String name)

        Called when a partition transitions into any other state from Active state.

        Parameters:
        name - name of the partition that becomes not Active
        Since:
        10.4
      • getParameters

        public Parameterizable getParameters()
        Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
        Returns:
        The Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
      • setParameters

        public void setParameters​(Parameterizable params)
        Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
        Parameters:
        params - the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
      • getResourceContents

        public InputStream getResourceContents​(String name)
                                        throws ResourceNotFoundException,
                                               StreamBaseException
        Returns an open input stream on the contents of the named resource file. The client is responsible for closing the stream when finished.

        Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/) The default project resource directories are src/main/resources and src/test/resources (test phase only).

        For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.

        Call this, or getResourceFile(String), during typecheck (as opposed to waiting for init() or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.

        Parameters:
        name - resource name
        Returns:
        an input stream containing the contents of the resource
        Throws:
        ResourceNotFoundException - resource cannot be found
        StreamBaseException - resource found, but cannot be opened
        Since:
        6.4 initial release, 10.0 added support for fully qualified resource names
        See Also:
        ClassLoader.getResource(String), getResourceFile(String), getDataDirectory()
      • getResourceFile

        public File getResourceFile​(String name)
                             throws ResourceNotFoundException,
                                    StreamBaseException
        Returns a File pointing to the regular file or directory if it exists. If this returns a regular file, getResourceContents(String) is guaranteed not to fail.

        Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/). The default project resource directories are src/main/resources and src/test/resources (test phase only).

        For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.

        Call this, or getResourceContents(String), during typecheck (as opposed to waiting for init() or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.

        Parameters:
        name - resource name
        Returns:
        a File pointing to the named resource, or null if one cannot be associated with the named resource (for example, if storage is not available on a file system for this resource)
        Throws:
        ResourceNotFoundException - resource cannot be found
        StreamBaseException - resource found, but cannot be opened
        Since:
        6.4 initial release, 10.0 added support for fully qualified resource names
        See Also:
        ClassLoader.getResource(String), getResourceContents(String), getDataDirectory()
      • registerRunnable

        public final void registerRunnable​(String runnableName,
                                           Runnable operatorRunnable,
                                           boolean shouldInterrupt,
                                           boolean synchronizedShutdown)
                                    throws StreamBaseException
        Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

        The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

        This method should be called from the operator's init() method.

        Parameters:
        runnableName - The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.
        operatorRunnable - The Runnable to register.
        shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.
        synchronizedShutdown - Whether to wait for this thread to exit before postShutdown is called.
        Throws:
        StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
        Since:
        6.6.14 initial release
      • registerRunnable

        public final void registerRunnable​(String runnableName,
                                           Runnable operatorRunnable,
                                           boolean shouldInterrupt)
                                    throws StreamBaseException
        Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

        The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

        This method should be called from the operator's init() method.

        Parameters:
        runnableName - The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.
        operatorRunnable - The Runnable to register.
        shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.
        Throws:
        StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
        Since:
        6.5 initial release
      • registerRunnable

        public final void registerRunnable​(Runnable operatorRunnable,
                                           boolean shouldInterrupt)
                                    throws StreamBaseException
        Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

        The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

        This method should be called from the operator's init() method.

        Parameters:
        operatorRunnable - The Runnable to register.
        shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example. -
        Throws:
        StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
      • registerRunnable

        public final void registerRunnable​(Runnable operatorRunnable)
                                    throws StreamBaseException
        Deprecated.
        As of StreamBase version 3.7, replaced by registerRunnable(Runnable, boolean)
        Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

        The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, shut down, suspended, and resumed along with the rest of the application.

        This method should be called from the adapter's init() method.

        Parameters:
        operatorRunnable - The Runnable to register.
        Throws:
        StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
      • scheduleRunnable

        public Cancellable scheduleRunnable​(Date when,
                                            Runnable runner)
        Schedules a Runnable object to be run at some future point as part of the same parallel region as this Operator. Prefer using this to implement scheduling events by the operator rather than allocating a new thread using registerRunnable, if the parallelism is not necessary.
        Parameters:
        when - Time to run the runner. If it is in the past, it is run immediately
        runner - The Runnable to schedule
        Returns:
        Handle to task for cancellation, if the task hasn't already started running. This handle is thread-safe, but attempting to cancel an already started task has no effect.
        Since:
        7.4 initial release
      • requireInputPortCount

        protected void requireInputPortCount​(int numPorts)
                                      throws PortMismatchException
        Throws a PortMismatchException if the number of ports is not numPorts. The StreamBase Studio IDE will recognize this and draw the operator box with the appropriate number of input ports.

        This method should be called by the typecheck method.

        Parameters:
        numPorts - the number of ports
        Throws:
        PortMismatchException - When the number of input ports is incorrect.
      • getPortCounts

        public PortCounts getPortCounts()
                                 throws TypecheckException
        An optional method that subclasses can override to dynamically tell Studio the number of input and output ports. Clients should expect this method to be called after all setters, but prior to typecheck. This method will not be called if Studio encountered any exception while calling each setter with the property values just entered by the user.

        Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.

        Returns:
        PortCounts record type
        Throws:
        TypecheckException - Type check exception
      • getAsyncInputPorts

        public int[] getAsyncInputPorts()
        Override to indicate to Studio which ports (0-based) might process data asynchronously, this provides different rendering on the EventFlow canvas to assist users in understanding their data flow
        Returns:
        the zero based indices of the asynchronous ports; null by default
        Since:
        7.6 initial release
      • getInputPortCount

        public final int getInputPortCount()
        Return the number of input ports.
        Returns:
        number of Input ports, if not in an application 0.
      • sendOutput

        public void sendOutput​(int port,
                               Tuple tuple)
                        throws StreamBaseException
        Enqueue a Tuple to be sent synchronously to downstream operators.

        All calls to sendOutput(int, Tuple) are well ordered with respect to other calls to sendOutput(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

        Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

        A note about reusing/caching the sent tuple.

        If the given tuple is a tuple that this operator received from a processTuple() call, then the tuple can be reused if and only if getReuseTuple() is FALSE.

        If the given tuple is created by this operator then it can be reused when the call to sendOutput() returns. StreamBase is done with the tuple once control returns to this operator.

        sendOutput(int, Tuple) uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.

        Parameters:
        port - The output port the Tuple is enqueued upon (ports are zero based)
        tuple - The Tuple to enqueue
        Throws:
        StreamBaseException - if the port is invalid or the tuple argument doesn't match the schema of the output port.
        Since:
        10.0 transactional behavior
      • sendOutput

        public void sendOutput​(int port,
                               List<Tuple> tuples)
                        throws StreamBaseException
        Enqueue a List of Tuples to be sent synchronously to downstream operators.

        All calls to sendOutput(int, Tuple) are well ordered with respect to other calls to sendOutput(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

        Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

        A note about reusing/caching the sent tuples.

        If the tuples in the given list are tuples that this operator received from processTuple() calls, then these tuples can be reused if and only if getReuseTuple() is FALSE.

        If the given list of tuples is created by this operator, then the tuples in this list can be reused when the call to sendOutput() returns. StreamBase is done with the tuples once control returns to this operator.

        sendOutput(int, List) uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.

        Parameters:
        port - The output port the Tuple is enqueued upon (ports are zero based)
        tuples - The List of Tuple objects to enqueue
        Throws:
        StreamBaseException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
        Since:
        10.0 transactional behavior
      • sendOutputAsync

        public void sendOutputAsync​(int port,
                                    Tuple tuple)
                             throws StreamBaseException
        Enqueue a Tuple to be sent asynchronously to downstream operators.

        All calls to sendOutputAsync(int, Tuple) are well ordered with respect to other calls to sendOutputAsync(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

        Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

        Note: The Tuple will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between sendOutputAsync and sendOutput is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.

        The tuple can be reused once sendOutputAsync returns.

        A new transaction is always started when the Tuple is read from the asynchronous queue for processing.

        Parameters:
        port - The output port the Tuple is enqueued upon (ports are zero based)
        tuple - A tuple to enqueue asynchronously
        Throws:
        StreamBaseException - if the port is invalid or the tuple argument doesn't match the schema of the output port.
        Since:
        6.4.3 initial release, 10.0 transactional behavior
      • sendOutputAsync

        public void sendOutputAsync​(int port,
                                    List<Tuple> tuples)
                             throws StreamBaseException
        Enqueue a List of Tuples to be sent asynchronously to downstream operators.

        All calls to sendOutputAsync(int, Tuple) are well ordered with respect to other calls to sendOutputAsync(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

        Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

        Note: The Tuples will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between sendOutputAsync and sendOutput is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.

        The tuples (and the List<Tuple>) can be reused once sendOutputAsync returns.

        A new transaction is always started when the Tuple is read from the asynchronous queue for processing.

        Parameters:
        port - The output port the Tuple is enqueued upon (ports are zero based)
        tuples - A List<Tuple> to enqueue asynchronously
        Throws:
        StreamBaseException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
        Since:
        6.4.3 initial release, 10.0 transactional behavior
      • getInputSchema

        public final Schema getInputSchema​(int port)
        Returns the schema of an input port. At typecheck time or before, behaves like getTypecheckInputSchema(int). After typecheck time, behaves like getRuntimeInputSchema(int).
        Parameters:
        port - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
      • getRuntimeInputSchema

        public final Schema getRuntimeInputSchema​(int port)
        Returns the schema of an input port at runtime. This schema will never contain capture fields, as they will have been transformed according to the CaptureTransformStrategy set by setCaptureStrategy(CaptureTransformStrategy). This will be the actual schema of tuples passed to processTuple(int, Tuple)
        Parameters:
        port - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
        Since:
        7.2.6 initial release
      • getTypecheckInputSchema

        public final Schema getTypecheckInputSchema​(int port)
        Returns the schema of an input port that was set at application typecheck time. This schema may contain capture fields, and may not represent the actual schema of tuples passed into the processTuple(int, Tuple) method. For the actual schema of those tuples, see getRuntimeInputSchema(int).
        Parameters:
        port - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
        Since:
        7.2.6 initial release
      • getOutputSchema

        public final Schema getOutputSchema​(int outputPort)
        Return the output schema for the given output port (zero-based). If called before the operator is initialized, this method will act as getTypecheckOutputSchema(int). Otherwise, if called from init() or later, it will act as getRuntimeOutputSchema(int)
        Parameters:
        outputPort - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
      • evaluate

        public String evaluate​(String st,
                               Tuple input)
                        throws StreamBaseException
        evaluate the given string expression in the context of the running module If an error occurs during the evaluation, the message return value will be a string containing the error message
        Parameters:
        st - the expression to be evaluated
        input - the input tuple
        Returns:
        the evaluated expression as a string or an error message if evaluation failed.
        Throws:
        StreamBaseException - Could not perform evaluation
        Since:
        7.6 initial release
      • setDynamicVariable

        public void setDynamicVariable​(String name,
                                       Object value)
                                throws StreamBaseException
        Sets the value of this module's dynamic variable. Valid only at runtime.
        Parameters:
        name - name of the dynamic variable to be set
        value - the new value (may be null)
        Throws:
        StreamBaseException - if an error occurs
        Since:
        7.6 initial release
      • getNamedSchema

        public Schema getNamedSchema​(String name)
        Parameters:
        name - of the named schema
        Returns:
        a Schema object representing this named schema or null if there is no named schema in this module with name
        Since:
        7.6 initial release
      • getDynamicVariablesSchema

        public Schema getDynamicVariablesSchema()
        Retrieves a Schema containing information about every dynamic variable available to the module this operator is contained in. Never null: an empty schema is returned to indicate that there are no visible dynamic variables.
        Returns:
        the schema of the containing module's dynamic variables. Note that certain internal dynamic variables (such as those used by sequence operators) are not included.
        Since:
        7.6 initial release
      • getDynamicVariables

        public Tuple getDynamicVariables()
                                  throws TupleException
        Retrieves a read-only tuple describing the current value of all dynamic variables in the module this operator is contained in. Never null: an empty tuple is returned to indicate that there are no visible dynamic variables.
        Returns:
        a read-only tuple containing all dynamic variables in this module.
        Throws:
        TupleException - Error creating return tuple
        Since:
        7.6 initial release
      • setManagedState

        public final boolean setManagedState​(String key,
                                             Serializable value)
        Store a value in transactional memory

        This method can only be called when an operator is configured to use transactional memory for its state. There must be an active transaction when this method is called.

        Parameters:
        key - Key value. Must be cluster unique if state is replicated.
        value - Value to store
        Returns:
        true if successful, false otherwise
        Throws:
        UnsupportedOperationException - Operator state not in transactional memory or no active transaction
        Since:
        10.0 initial release
      • getManagedState

        public final Serializable getManagedState​(String key)
        Get a value from transactional memory.

        There must be an active transaction when this method is called

        Parameters:
        key - Key value
        Returns:
        Stored value or null if no value associated with key
        Throws:
        UnsupportedOperationException - Operator state not in transactional memory or no active transaction
        Since:
        10.0 initial release
      • getStorageMethod

        public final StorageMethod getStorageMethod()
        Get operator's state's storage method.

        May return null if information is not available, e.g. during typecheck.

        Returns:
        data storage used for this operator
        Since:
        10.2 initial release
      • getTransactionIsolationLevel

        public final TransactionIsolationLevel getTransactionIsolationLevel()
        Get current transaction isolation level.

        May return null if information is not available, e.g. during typecheck.

        Returns:
        The current transaction isolation level
        Since:
        10.2 initial release
      • isConcurrentByDefault

        public boolean isConcurrentByDefault()
        Indicate whether an operator should be in a concurrent region by default

        The default value is false.

        The value of this method is used at design-time to indicate whether an operator should be put into a concurrent region when added to an EventFlow.

        Returns:
        true if this operator will be in a concurrent region by default, false otherwise.
        Since:
        10.0 initial release
      • supportsTransactionalMemory

        public boolean supportsTransactionalMemory()
        Indicate whether an operator optionally supports transactional memory

        Override this method to return true to indicate that an operator can optionally use transactional memory as storage. The default value is false.

        Using setManagedState(java.lang.String, java.io.Serializable) and getManagedState(java.lang.String) are valid only if this method returns true and getStorageMethod() returns transactional memory

        Returns:
        true if this operator optionally supports transactional memory, false otherwise.
        Since:
        10.2 initial release
      • getDataDirectory

        public final Path getDataDirectory()
                                    throws IOException
        Get data directory.

        The data directory can be used as required to store data files. Files stored in this directory can only be accessed relative to the returned directory path, i.e. this directory is not on the classpath of the fragment.

        The data directory can be configured using the com.tibco.ep.streambase.configuration.sbengine configuration type.

         
        name = "data-area-configuration"
        version = "1.0.0"
        type = "com.tibco.ep.streambase.configuration.sbengine"
        configuration =
        {
                StreamBaseEngine =
                {
                        streamBase =
                        {
                                dataAreaPath = "v1-data-area"
                        }
                }
        }
         
         

        Any files created in this directory are the responsibility of the operator that created them. Any clean up must be explicitly done by the operator.

        The data directory is global to an executing fragment, so all operators running in a fragment can write files into this directory. It is recommended that sub-directories be used to isolate files.

        WARNING: It is illegal to call this method during typecheck

        Returns:
        Path to a writable data directory. The return value can never be null.
        Throws:
        IOException - Could not create the data directory
        Since:
        10.2 initial release
      • getRuntimeOutputSchema

        public final Schema getRuntimeOutputSchema​(int outputPort)
        Return the output schema that should actually use to sendOutput(int, Tuple) or sendOutputAsync(int, Tuple). Note that this schema will sometimes be different than the output schema set at typecheck time by the transformation determined by setCaptureStrategy(CaptureTransformStrategy). It will never contain capture fields. This method may not be called until after typecheck is finished; to do so will cause an IllegalStateException
        Parameters:
        outputPort - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
        Since:
        7.2.6 initial release
      • getTypecheckOutputSchema

        public final Schema getTypecheckOutputSchema​(int outputPort)
        Return the output schema as set by typecheck. Note that if this schema contains any capture fields, it may not be the same as the correct schema for tuples sent to sendOutput(int, Tuple) at runtime.
        Parameters:
        outputPort - the port to return the schema for (ports are zero based)
        Returns:
        Schema for the port, null if not running in an appication
        Throws:
        IndexOutOfBoundsException - if port not in range
        Since:
        7.2.6 initial release
      • setOutputSchema

        public final Schema setOutputSchema​(int port,
                                            Schema outputSchema)
                                     throws TypecheckException
        Sets the output schema for the given output port (port #'s are zero based). This method should be called from the typecheck method of a Java Operator or Adapter.

        Note that the actual Schema object to create tuples at runtime (for use by sendOutput(int, Tuple) or sendOutputAsync(int, Tuple)) should be retrieved using getRuntimeOutputSchema(int), as that will take into account any capture field transformations determined by getCaptureStrategy().

        Parameters:
        port - the port to set the given schema to (ports are zero based)
        outputSchema - the schema to set the given port to
        Returns:
        returns the typecheck version of the output schema
        Throws:
        TypecheckException - Typecheck error
        Since:
        7.0 returns Schema, 7.2 The returned schema should not be the Schema used to create output tuples. For the Schema to use to create output tuples, call getRuntimeOutputSchema(int) in the init() method or later.
        See Also:
        getOutputPortCount()
      • sendErrorOutput

        public void sendErrorOutput​(String message)
        Send an error tuple from this operator's error port.

        This method will also record the message in the debug log.

        Parameters:
        message - A description of the error, must not be null
        Throws:
        IllegalArgumentException - if message is null
        Since:
        5.1 initial release
      • sendErrorOutput

        public void sendErrorOutput​(Throwable t)
        Send an exception via the error output port

        This method will also record the message in the debug log.

        Parameters:
        t - the exception to report
        Throws:
        IllegalArgumentException - if exception is null
        Since:
        7.0 initial release
      • sendErrorOutput

        public void sendErrorOutput​(Throwable t,
                                    int port,
                                    Tuple errorTuple)
        Send an exception via the error output port

        This method will also record the message in the debug log.

        Parameters:
        t - the exception to report
        port - the input/output port that the error is related to
        errorTuple - a tuple that caused or is related to the error
        Throws:
        IllegalArgumentException - if exception is null
        Since:
        7.0 initial release
      • getTableAccessor

        public TableAccessor getTableAccessor​(String name)
                                       throws StreamBaseException
        Get a TableAccessor for a table in the local module by name of that table.

        Warning: This interface is provisional, and will likely change in upcoming versions of StreamBase.

        Parameters:
        name - the table name
        Returns:
        a table accessor
        Throws:
        StreamBaseException - Error accessing TableAccessor
        Since:
        7.2 initial release
      • getLogger

        protected org.slf4j.Logger getLogger()
        Retrieves a Logger suitable for logging messages and exceptions for this Operator. Equivalent to LoggerFactory.getLogger(Class) with arguments this.getClass().
        Returns:
        a Logger instance for this Operator
        Since:
        6.0 initial release
      • setReuseTuple

        protected void setReuseTuple​(boolean reuse)
        Allow/disallow the runtime to reuse tuples on operator input. The default is to disallow tuple reuse. Reusing operator input tuples will create fewer objects which may increase performance of the operator. To enable call: setReuseTuple(true) from the derived constructor.

        Note: that if you plan to store the input Tuple as state in your operator you must copy the Tuple before you store it, if you have called setReuseTuple(true).

        Parameters:
        reuse - allow/disallow tuple reuse
      • getReuseTuple

        public boolean getReuseTuple()
        Get the state of the input Tuple reuse flag.
        Returns:
        boolean if input Tuple reuse is allowed/disallowed
      • getStateChangeTimeout

        public int getStateChangeTimeout()
        Returns:
        the Operator state change timeout value, in milliseconds
        See Also:
        DEFAULT_STATE_CHANGE_TIMEOUT
      • getLocation

        public Location getLocation()
        Return the Operator-wide location, useful for error reporting not associated to a particular property. If an error is related to a property, use getLocation(String) instead
        Specified by:
        getLocation in interface LocatedItem
        Returns:
        the location for error reporting from this operator.
      • getLocation

        public Location getLocation​(String property)
        Return a new location within this Operator, associated with the given property name. Equivalent to getLocation() for StreamSQL-sourced Operators. Using this as an argument to TypecheckException constructors will cause Studio to display an error indicated alongside the UI widget corresponding to the passed in property.
        Parameters:
        property - expected to be the bean name of an Operator property
        Returns:
        the location for error reporting from this operator
        Since:
        6.5 initial release
      • setLogLevel

        public static void setLogLevel​(org.slf4j.Logger logger,
                                       String level)
                                throws StreamBaseException
        Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.
        Parameters:
        logger - the Logger object on which to set the level.
        level - the level at which to set the Logger instance.
        Throws:
        StreamBaseException - the log level could not be set, most likely because the current logging implementation is not Logback.
        Since:
        6.4.4 initial release
      • setLogLevel

        public static void setLogLevel​(org.slf4j.Logger logger,
                                       Operator.LogLevel level)
                                throws StreamBaseException
        Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.
        Parameters:
        logger - the Logger object on which to set the level.
        level - the Operator.LogLevel at which to set the Logger instance.
        Throws:
        StreamBaseException - the log level could not be set, most likely because the current logging implementation is not Logback.
        Since:
        6.4.6 initial release
      • getRuntimeEnvironment

        public Operator.RuntimeEnvironment getRuntimeEnvironment()
        Return the Operator.RuntimeEnvironment for this StreamBase Server. NOTE: Not valid during typecheck
        Returns:
        current RuntimeEnvironment, or null during typecheck
        Since:
        7.0 initial release
      • isRuntime

        public boolean isRuntime()
        Returns:
        true if called during runtime (after the server has started), false otherwise
        Since:
        7.6 initial release
      • getOperatorConfigurationAccessor

        public Operator.ConfigurationAccessor getOperatorConfigurationAccessor()
        Provides access to operator and adapter configuration information from the server's configuration file. This call is intended to be made during typecheck or later.
        Returns:
        a Operator.ConfigurationAccessor to provide access to the aforementioned
        Since:
        7.5 initial release
      • getDataSourceConnection

        public Connection getDataSourceConnection​(String datasource)
                                           throws StreamBaseException
        Retrieve a connection to a JDBC Data Source configured in HOCON configuration files. The given datasource is the identifier of a jdbcDataSources entry in the configuration's JDBCDataSourceGroup.

        This method may return an existing connection, or create a new connection if necessary.

        This method always returns null during typecheck.

        Returns:
        a JDBC connection, or null on failure or during typecheck
        Throws:
        StreamBaseException - if HOCON config does not contain the specified datasource, or if there's an error trying to create a connection.
        Since:
        10.6.3
      • getSchemaForCapture

        public Schema getSchemaForCapture​(String captureName,
                                          int depth)
        Finds the schema for the given capture name in the context that this operator is running under. For a capture field that was not nested inside any other capture field, use a depth value of 0. To expand the capture fields in a schema returned by this function with a depth of d, call this function with a depth of d+1. For example, if an output stream of this module has schema (f int, c @A), then you might call getSchemaForCapture("A", 0) to get (g int, c @B) as the schema bound to A. Then you might call getSchemaForCapture("B", 1) to get (h int) as the schema bound to B.

        Note that the "nesting depth" here has nothing to do with capture fields found in lists or tuples -- the only containment that affects the nesting depth is a capture within another capture.

        Parameters:
        captureName - the name of the bound capture type
        depth - the depth of nesting at which to look up the schema of this capture binding.
        Returns:
        the schema associated with the bound capture type.
        Since:
        7.2 initial release
      • getTupleCaptureTransformer

        public TupleCaptureTransformer getTupleCaptureTransformer​(Schema s)
                                                           throws StreamBaseException
        Get a TupleCaptureTransformer capable of translating tuples with the given schema to the equivalent schema with all the capture fields expanded out, and translating expanded tuples back into tuples with the given schema This method may only be called at runtime; the exact schemas of any capture fields are not fully determined at typecheck time.
        Parameters:
        s - the schema with capture fields
        Returns:
        a TupleCaptureTransformer able to do the given transformations
        Throws:
        StreamBaseException - if this method is called at the wrong time, or if the transformations requested are not possible. This may occur, for example, if there is a name conflict between a captured field and a field already in the schema.
        Since:
        7.2 initial release
      • isPassthruOperator

        public boolean isPassthruOperator()
        Allows an operator to declare itself to be a "pass-thru" operator -- one that calls its sendOutput method with the unmodified tuples received by its processTuple method. Certain runtime optimizations are possible for pass-thru operators. In addition, the "older API" messages is suppressed for such operators. The StreamBase log adapter is an example of a pass-thru operator, as it logs, but leaves unmodified, the tuples that pass through it.
        Returns:
        true if the operator is pass-thru and false otherwise
        Since:
        7.2.12 initial release
      • getSearchKeywords

        @Deprecated
        public String[] getSearchKeywords()
        Deprecated.
        As of StreamBase version 10.0, keywords are configured through the operator manifest settings.

        Override to declare strings that assist the user in filtering for this operator when in Studio. Without search keywords, operators are found only by their short display name.

        NOTE: Since StreamBase 10.0, this method is not used, and has been replaced by Manifest properties. For search key-words, set the "KeyWords" manifest attribute. Please see the documentation for details.

        Returns:
        the default implementation returns no search keywords
        Since:
        7.3.10 initial release
      • allowsConcurrency

        public boolean allowsConcurrency()
        Override to indicate whether concurrency may be configured in Studio against this operator. Note that this applies to all instances, and is not intended to be interpreted on a per-instance basis. The default value when not overriden is true.

        This has no effect on the actual runtime environment for operator instances.

        Returns:
        whether this operator allows Studio to set any concurrency options
        Since:
        7.4.0 initial release
      • getProposedInputSchemas

        public Schema[] getProposedInputSchemas​(String mainName)
        Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.
        Parameters:
        mainName - the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas
        Returns:
        an array of Schemas to offer for input ports
        Since:
        7.4.0 initial release
      • getProposedOutputSchemas

        public Schema[] getProposedOutputSchemas​(String mainName)
        Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.
        Parameters:
        mainName - the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas
        Returns:
        an array of Schemas to offer for output ports
        Since:
        7.4.0 initial release
      • supportsArtifacts

        public boolean supportsArtifacts()
        Indicates whether or not the operator supports the use of artifacts The default implementation of this methods returns false. Override this method to return true if the operator uses artifacts.
        Returns:
        true if the operator supports the use of artifacts, false otherwise
      • getArtifactManager

        public OperatorArtifactManager getArtifactManager()
        Returns an ArtifactManager instance for managing the operator's artifacts.
        Returns:
        an ArtifactManager instance
      • getLock

        public Lock getLock()
        Return the guard for module state.
        Returns:
        the Lock instance