Package com.streambase.sb.client
Class DeMUXStreamBaseClient
- java.lang.Object
-
- com.streambase.sb.client.StreamBaseClient
-
- com.streambase.sb.client.DeMUXStreamBaseClient
-
- All Implemented Interfaces:
AutoCloseable
public class DeMUXStreamBaseClient extends StreamBaseClient
This is a StreamBaseClient that is intended for use by several independent queries. Rather than polling for data, when you subscribe you register a DequeueListener, which will be handed only the results for YOUR subscribe call
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interfaceDeMUXStreamBaseClient.DequeueListenerImplement this interface to get calls back with data when using a DeMUXStreamBaseClient-
Nested classes/interfaces inherited from class com.streambase.sb.client.StreamBaseClient
StreamBaseClient.ListEntityFlags, StreamBaseClient.SerializedTupleBuffer, StreamBaseClient.TupleBuffer
-
-
Field Summary
Fields Modifier and Type Field Description protected com.streambase.sb.client.Admin_adminprotected static longDEQUEUE_WAITstatic intVARIABLE_RESPONSEUsed to signal that a command will return a variable number of responses
-
Constructor Summary
Constructors Constructor Description DeMUXStreamBaseClient(StreamBaseURI streamBaseURI)DeMUXStreamBaseClient(String uri)DeMUXStreamBaseClient(List<StreamBaseURI> uris, ClientSettings settings)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description booleancanDequeue()Return true if we can call dequeue without blocking.DequeueResultdequeue()Dequeue a batch of tuples from a subscribed stream.DequeueResultdequeue(long timeoutMS)Dequeue a batch of tuples from a subscribed stream.intgetDequeueBufferSize()protected static StringgetFullLogicalName(StreamProperties streamProperties)ClientSettingsgetSettings()Return the settings for this clientStringgetUniqueSuffix()This manages a threadsafe counter which will give a different value each time it is called.StreamBaseURIgetURI()Return the URI used by this Client.List<StreamBaseURI>getURIs()get all of the URI's for this clientprotected booleanhaModeOn()is HA mode onbooleanisSubscribed(StreamProperties stream)Return status if we are subscribed to the given streamString[]operatorStatus(String containerName)Return the status of all the operators in the specified containerStreamPropertiesresubscribe(StreamProperties props, String logicalstream, String predicate)Resubscribe to a stream with a predicate.StreamPropertiesresubscribe(String streamname, String logicalstream, String predicate)Resubscribe to a stream with a predicate.DequeueResultsQueuesetDequeueQueue(DequeueResultsQueue dequeuer)DequeueResult.InterceptorsetDequeueResultInterceptor(DequeueResult.Interceptor dri)Set the dequeue results interceptor for this client connection.String[]status()Return the status of the StreamBase ServerString[]status(boolean verbose)Return the status of the StreamBase ServerStreamPropertiessubscribe(StreamProperties props)Subscribe to a streamStreamPropertiessubscribe(StreamProperties props, String logicalstream, String predicate)Subscribe to a stream with a predicate.StreamPropertiessubscribe(String streamname)Subscribe to a streamStreamPropertiessubscribe(String streamName, CaptureTransformStrategy strategy)Subscribe to a streamStreamPropertiessubscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate)Subscribe to a stream with a predicate.StreamPropertiessubscribe(String streamname, String logicalstream, String predicate)Subscribe to a stream with a predicate.StreamPropertiessubscribe(String streamname, String logicalstream, String predicate, DeMUXStreamBaseClient.DequeueListener listener)This is the only subscribe operation that is supported by the DeMUXStreamBaseClientvoidunsubscribe(StreamProperties logicalProps)Unsubscribe from the given stream name.voidunsubscribe(String logicalStreamName)Unsubscribe from the given stream name.-
Methods inherited from class com.streambase.sb.client.StreamBaseClient
addConnectionStatusCallback, assureNameStartsWithContainer, checkValidStreamname, close, close, describe, enableBuffering, enableBuffering, enqueue, enqueue, enqueue, enqueue, flushAllBuffers, flushBuffer, flushBuffer, getAllStreamProperties, getAllStreamProperties, getConnectionError, getConnectionID, getConnectionStatus, getDequeueResultsInterceptor, getDynamicVariable, getDynamicVariables, getEnqueueBufferSize, getSchemaByHash, getSchemaByName, getSchemaForStream, getStreamProperties, getStreamProperties, getStreamPropertiesByHash, getStreamPropertiesByHash, getSubscribedStreamNames, getTupleDequeueCount, getTupleEnqueueCount, getVersion, hasSchema, hasStream, hasStreamProperties, isClosed, listEntities, listEntities, listEntities, listEntities, listEntities, listEntities, readTable, readTable, removeConnectionStatusCallback, setConnectionStatus, setDynamicVariable, setQuiescentLimit, typecheck, typecheck, unsubscribeInternal
-
-
-
-
Field Detail
-
DEQUEUE_WAIT
protected static final long DEQUEUE_WAIT
- See Also:
- Constant Field Values
-
VARIABLE_RESPONSE
public static final int VARIABLE_RESPONSE
Used to signal that a command will return a variable number of responses- See Also:
- Constant Field Values
-
_admin
protected final com.streambase.sb.client.Admin _admin
-
-
Constructor Detail
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(List<StreamBaseURI> uris, ClientSettings settings) throws URIException, StreamBaseException
- Throws:
StreamBaseExceptionURIException
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(String uri) throws URIException, StreamBaseException
- Throws:
URIExceptionStreamBaseException
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(StreamBaseURI streamBaseURI) throws URIException, StreamBaseException
- Throws:
URIExceptionStreamBaseException
-
-
Method Detail
-
subscribe
public StreamProperties subscribe(String streamname, String logicalstream, String predicate, DeMUXStreamBaseClient.DequeueListener listener) throws StreamBaseException
This is the only subscribe operation that is supported by the DeMUXStreamBaseClient- Parameters:
streamname- Qualified or partial streamname. If partial, will be relative to the connection base (usually default.)logicalstream- Logical name for this stream. If you specify a predicate, this is required and must be different from streamname Also, only one listener per logicalStream name is allowed for a given DeMUXStreamBaseClientpredicate- Your querylistener- Implement this interface to be called when your data arrives (and other events)- Returns:
- StreamProperties for the stream being listened to.
- Throws:
StreamBaseException
-
getFullLogicalName
protected static String getFullLogicalName(StreamProperties streamProperties)
-
getUniqueSuffix
public String getUniqueSuffix()
This manages a threadsafe counter which will give a different value each time it is called.- Returns:
- a string with underscore and a number. Each call to it will be different.
-
canDequeue
public boolean canDequeue()
Description copied from class:StreamBaseClientReturn true if we can call dequeue without blocking. This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.- Overrides:
canDequeuein classStreamBaseClient- Returns:
- boolean if we can dequeue without blocking
-
dequeue
public DequeueResult dequeue() throws StreamBaseException
Description copied from class:StreamBaseClientDequeue a batch of tuples from a subscribed stream. This method will block. Will return null if the connection is closed.- Overrides:
dequeuein classStreamBaseClient- Returns:
- a DequeueResult (or null) from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException- thrown on network or other errors
-
dequeue
public DequeueResult dequeue(long timeoutMS) throws StreamBaseException
Description copied from class:StreamBaseClientDequeue a batch of tuples from a subscribed stream. This method will block until there are tuples available or timeoutMS milliseconds have past. It will return null if the connection is closed. It will return an empty DequeueResult if timeoutMS milliseconds have past and no tuples have returned. A timeout_ms of zero will block indefinitely, or until a tuple arrives.- Overrides:
dequeuein classStreamBaseClient- Parameters:
timeoutMS- timeout in milliseconds- Returns:
- a DequeueResult from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException- thrown on network or other errors
-
setDequeueQueue
public DequeueResultsQueue setDequeueQueue(DequeueResultsQueue dequeuer)
-
setDequeueResultInterceptor
public DequeueResult.Interceptor setDequeueResultInterceptor(DequeueResult.Interceptor dri)
Description copied from class:StreamBaseClientSet the dequeue results interceptor for this client connection. This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null. This method cannot be safely called while another thread is calling dequeue().- Overrides:
setDequeueResultInterceptorin classStreamBaseClient- Parameters:
dri- dequeue results interceptor for this client connection- Returns:
- the old dequeue results interceptor
-
subscribe
public StreamProperties subscribe(String streamname) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream- Overrides:
subscribein classStreamBaseClient- Parameters:
streamname- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException- thrown on error
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream- Overrides:
subscribein classStreamBaseClient- Parameters:
streamName- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINERis used.strategy- the capture transform strategy- Returns:
- the stream properties
- Throws:
StreamBaseException- thrown on error
-
subscribe
public StreamProperties subscribe(StreamProperties props) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream- Overrides:
subscribein classStreamBaseClient- Parameters:
props- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException- thrown on error
-
subscribe
public StreamProperties subscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribein classStreamBaseClient- Parameters:
streamname- the stream to subscribe tologicalstream- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate- a predicate to apply to subset the stream- Returns:
- the stream properties
- Throws:
StreamBaseException- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribein classStreamBaseClient- Parameters:
streamName- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINERis used.strategy- the CaptureTransformStrategy to use if there are capture fields in the streamlogicalstream- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate- a predicate to apply to subset the stream- Returns:
- Stream properties
- Throws:
StreamBaseException- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClientSubscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribein classStreamBaseClient- Parameters:
props- the stream to subscribe tologicalstream- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException- thrown on error, including empty or null streamname or predicate
-
resubscribe
public StreamProperties resubscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClientResubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribein classStreamBaseClient- Parameters:
streamname- the stream to subscribe tologicalstream- the name of the logical stream to associate with this predicatepredicate- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException- thrown on error
-
resubscribe
public StreamProperties resubscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClientResubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribein classStreamBaseClient- Parameters:
props- the stream to subscribe tologicalstream- the name of the logical stream to associate with this predicatepredicate- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException- thrown on error
-
unsubscribe
public void unsubscribe(String logicalStreamName) throws StreamBaseException
Description copied from class:StreamBaseClientUnsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribein classStreamBaseClient- Parameters:
logicalStreamName- the name of the stream to unsubscribe from, which is logical streamname on filtered subscribe- Throws:
StreamBaseException- thrown on network or other errors
-
unsubscribe
public void unsubscribe(StreamProperties logicalProps) throws StreamBaseException
Description copied from class:StreamBaseClientUnsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribein classStreamBaseClient- Parameters:
logicalProps- the StreamProperties for this logical subscription- Throws:
StreamBaseException- thrown on network or other errors
-
isSubscribed
public boolean isSubscribed(StreamProperties stream)
Description copied from class:StreamBaseClientReturn status if we are subscribed to the given stream- Overrides:
isSubscribedin classStreamBaseClient- Parameters:
stream- the stream we want to check- Returns:
- status if we are subscribed to the given stream
-
getDequeueBufferSize
public int getDequeueBufferSize()
-
haModeOn
protected boolean haModeOn()
is HA mode on- Returns:
- if HA mode is on
-
getSettings
public ClientSettings getSettings()
Return the settings for this client- Returns:
- settings for this client
-
status
public String[] status() throws StreamBaseException
Return the status of the StreamBase Server- Returns:
- the status
- Throws:
StreamBaseException- thrown on network and other errors
-
status
public String[] status(boolean verbose) throws StreamBaseException
Return the status of the StreamBase Server- Parameters:
verbose- return a verbose status- Returns:
- the status
- Throws:
StreamBaseException- thrown on network and other errors
-
operatorStatus
public String[] operatorStatus(String containerName) throws StreamBaseException
Return the status of all the operators in the specified container- Parameters:
containerName- the name of the container- Returns:
- an array of strings of the form "operatorname=status"
- Throws:
StreamBaseException- error getting status
-
getURI
public StreamBaseURI getURI()
Return the URI used by this Client. In HA mode this just returns the 1st URI for the client- Returns:
- the uri for this client
-
getURIs
public List<StreamBaseURI> getURIs()
get all of the URI's for this client- Returns:
- all of the URI's for this client
-
-