Package com.streambase.sb.client
Class DeMUXStreamBaseClient
java.lang.Object
com.streambase.sb.client.BaseClient
com.streambase.sb.client.StreamBaseClient
com.streambase.sb.client.DeMUXStreamBaseClient
- All Implemented Interfaces:
AutoCloseable
This is a StreamBaseClient that is intended for use by several independent queries.
Rather than polling for data, when you subscribe you register a DequeueListener, which
will be handed only the results for YOUR subscribe call
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic interface
Implement this interface to get calls back with data when using a DeMUXStreamBaseClientNested classes/interfaces inherited from class com.streambase.sb.client.StreamBaseClient
StreamBaseClient.ListEntityFlags, StreamBaseClient.SerializedTupleBuffer, StreamBaseClient.TupleBuffer
-
Field Summary
Modifier and TypeFieldDescriptionprotected static final long
Dequeue wait time in milliseconds(package private) Map<String,
DeMUXStreamBaseClient.DequeueListener> Listener map -
Constructor Summary
ConstructorDescriptionDeMUXStreamBaseClient
(StreamBaseURI streamBaseURI) ConstructorConstructorDeMUXStreamBaseClient
(List<StreamBaseURI> uris, ClientSettings settings) Constructor -
Method Summary
Modifier and TypeMethodDescriptionboolean
Return true if we can call dequeue without blocking.dequeue()
Dequeue a batch of tuples from a subscribed stream.dequeue
(long timeoutMS) Dequeue a batch of tuples from a subscribed stream.int
protected static String
getFullLogicalName
(StreamProperties streamProperties) Get full logical nameThis manages a threadsafe counter which will give a different value each time it is called.boolean
isSubscribed
(StreamProperties stream) Return status if we are subscribed to the given streamresubscribe
(StreamProperties props, String logicalstream, String predicate) Resubscribe to a stream with a predicate.resubscribe
(String streamname, String logicalstream, String predicate) Resubscribe to a stream with a predicate.com.streambase.sb.client.DequeueResultsQueue
setDequeueQueue
(com.streambase.sb.client.DequeueResultsQueue dequeuer) Set the dequeue results interceptor for this client connection.subscribe
(StreamProperties props) Subscribe to a streamsubscribe
(StreamProperties props, String logicalstream, String predicate) Subscribe to a stream with a predicate.Subscribe to a streamsubscribe
(String streamName, CaptureTransformStrategy strategy) Subscribe to a streamsubscribe
(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate) Subscribe to a stream with a predicate.Subscribe to a stream with a predicate.subscribe
(String streamName, String logicalStream, String predicate, DeMUXStreamBaseClient.DequeueListener listener) This is the only subscribe operation that is supported by the DeMUXStreamBaseClientvoid
unsubscribe
(StreamProperties logicalProps) Unsubscribe from the given stream name.void
unsubscribe
(String logicalStreamName) Unsubscribe from the given stream name.Methods inherited from class com.streambase.sb.client.StreamBaseClient
addConnectionStatusCallback, assureNameStartsWithContainer, checkValidStreamname, close, close, connectionIDObject, describe, enableBuffering, enableBuffering, enqueue, enqueue, enqueue, enqueue, flushAllBuffers, flushBuffer, flushBuffer, getAllStreamProperties, getAllStreamProperties, getConnectionError, getConnectionID, getConnectionStatus, getDequeuer, getDequeueResultsInterceptor, getDynamicVariable, getDynamicVariables, getEnqueueBufferSize, getSchemaByHash, getSchemaByName, getSchemaForStream, getStreamProperties, getStreamProperties, getStreamPropertiesByHash, getStreamPropertiesByHash, getSubscribedStreamNames, getSubscribedStreams, getTupleDequeueCount, getTupleEnqueueCount, getVersion, hasSchema, hasStream, hasStreamProperties, incrementTuplesDequeued, incrementTuplesEnqueued, isClosed, listEntities, listEntities, listEntities, listEntities, listEntities, listEntities, readTable, readTable, removeConnectionStatusCallback, setConnectionID, setConnectionStatus, setDynamicVariable, setQuiescentLimit, typecheck, typecheck, unsubscribeInternal
Methods inherited from class com.streambase.sb.client.BaseClient
getSettings, getURI, getURIs, haModeOn, operatorStatus, status, status
-
Field Details
-
listenerMap
Map<String,DeMUXStreamBaseClient.DequeueListener> listenerMapListener map -
DEQUEUE_WAIT
protected static final long DEQUEUE_WAITDequeue wait time in milliseconds- See Also:
-
-
Constructor Details
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(List<StreamBaseURI> uris, ClientSettings settings) throws URIException, StreamBaseException Constructor- Parameters:
uris
- URIssettings
- client settings- Throws:
URIException
- Invalid URIStreamBaseException
- listen error
-
DeMUXStreamBaseClient
Constructor- Parameters:
uri
- URI- Throws:
URIException
- Invalid URIStreamBaseException
- listen error
-
DeMUXStreamBaseClient
Constructor- Parameters:
streamBaseURI
- URI- Throws:
URIException
- Invalid URIStreamBaseException
- listen error
-
-
Method Details
-
subscribe
public StreamProperties subscribe(String streamName, String logicalStream, String predicate, DeMUXStreamBaseClient.DequeueListener listener) throws StreamBaseException This is the only subscribe operation that is supported by the DeMUXStreamBaseClient- Parameters:
streamName
- Qualified or partial stream name. If partial, will be relative to the connection base (usually default.)logicalStream
- Logical name for this stream. If you specify a predicate, this is required and must be different from streamname Also, only one listener per logicalStream name is allowed for a given DeMUXStreamBaseClientpredicate
- Your querylistener
- Implement this interface to be called when your data arrives (and other events)- Returns:
- StreamProperties for the stream being listened to.
- Throws:
StreamBaseException
- subscription error
-
getFullLogicalName
Get full logical name- Parameters:
streamProperties
- stream properties- Returns:
- logical name
-
getUniqueSuffix
This manages a threadsafe counter which will give a different value each time it is called.- Returns:
- a string with underscore and a number. Each call to it will be different.
-
canDequeue
public boolean canDequeue()Description copied from class:StreamBaseClient
Return true if we can call dequeue without blocking. This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.- Overrides:
canDequeue
in classStreamBaseClient
- Returns:
- boolean if we can dequeue without blocking
-
dequeue
Description copied from class:StreamBaseClient
Dequeue a batch of tuples from a subscribed stream. This method will block. Will return null if the connection is closed.- Overrides:
dequeue
in classStreamBaseClient
- Returns:
- a DequeueResult (or null) from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException
- thrown on network or other errors
-
dequeue
Description copied from class:StreamBaseClient
Dequeue a batch of tuples from a subscribed stream. This method will block until there are tuples available or timeoutMS milliseconds have past. It will return null if the connection is closed. It will return an empty DequeueResult if timeoutMS milliseconds have past and no tuples have returned. A timeout_ms of zero will block indefinitely, or until a tuple arrives.- Overrides:
dequeue
in classStreamBaseClient
- Parameters:
timeoutMS
- timeout in milliseconds- Returns:
- a DequeueResult from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException
- thrown on network or other errors
-
setDequeueQueue
public com.streambase.sb.client.DequeueResultsQueue setDequeueQueue(com.streambase.sb.client.DequeueResultsQueue dequeuer) -
setDequeueResultInterceptor
Description copied from class:StreamBaseClient
Set the dequeue results interceptor for this client connection. This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null. This method cannot be safely called while another thread is calling dequeue().- Overrides:
setDequeueResultInterceptor
in classStreamBaseClient
- Parameters:
dri
- dequeue results interceptor for this client connection- Returns:
- the old dequeue results interceptor
-
subscribe
Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy) throws StreamBaseException Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamName
- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema
). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINER
is used.strategy
- the capture transform strategy- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
public StreamProperties subscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate) throws StreamBaseException Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamName
- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema
). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINER
is used.strategy
- the CaptureTransformStrategy to use if there are capture fields in the streamlogicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- Stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
resubscribe
public StreamProperties resubscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException Description copied from class:StreamBaseClient
Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicatepredicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error
-
resubscribe
public StreamProperties resubscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException Description copied from class:StreamBaseClient
Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicatepredicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error
-
unsubscribe
Description copied from class:StreamBaseClient
Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribe
in classStreamBaseClient
- Parameters:
logicalStreamName
- the name of the stream to unsubscribe from, which is logical streamname on filtered subscribe- Throws:
StreamBaseException
- thrown on network or other errors
-
unsubscribe
Description copied from class:StreamBaseClient
Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribe
in classStreamBaseClient
- Parameters:
logicalProps
- the StreamProperties for this logical subscription- Throws:
StreamBaseException
- thrown on network or other errors
-
isSubscribed
Description copied from class:StreamBaseClient
Return status if we are subscribed to the given stream- Overrides:
isSubscribed
in classStreamBaseClient
- Parameters:
stream
- the stream we want to check- Returns:
- status if we are subscribed to the given stream
-
getDequeueBufferSize
public int getDequeueBufferSize()
-