StreamBase C++ API  7.6.2dev
StreamBaseClient.hpp
1 // Copyright (c) 2004-2016 Cloud Software Group, Inc. All rights reserved.
2 
3 #ifndef STREAMBASE_CLIENT_H
4 #define STREAMBASE_CLIENT_H
5 
6 #include "StreamBase.hpp"
7 #include "StreamBaseVersion.hpp"
8 #include "Exceptions.hpp"
9 #include "Schema.hpp"
10 #include "Field.hpp"
11 #include "StreamProperties.hpp"
12 #include "StreamBaseEntityType.hpp"
13 #include "StreamBaseURI.hpp"
14 #include "TupleList.hpp"
15 #include "DequeueResult.hpp"
16 #include "ClientSettings.hpp"
17 
18 SB_INTERNAL_FWD(StreamBaseClientImpl)
19 
20 SB_NAMESPACE_BEGIN;
21 
22 /// The StreamBase client API. Connects to an StreamBase node over the network,
23 /// via XML/RPC and the tuple wire protocol. With the exception of the close() method,
24 /// the StreamBaseClient object is not thread safe, so access to a single object
25 /// needs to be synchronized between threads. If multiple threads subscribe to streams,
26 /// it is recommended that they use separate instances of this class.
27 ///
29  public:
30  static const unsigned int DEFAULT_FLUSH_INTERVAL = 250;
31 
32 #ifndef DOXYGEN_SKIP
33  /// This tells the rest of the API that tuples want to remain in serialized form,
34  /// ostensibly for consumption by the .NET API.
35  static bool useRawTuples;
36 #endif
37 
38  /// Creates an StreamBase client and establishes a connection to a remote
39  /// server. Optional parameter buffer_size specifies the number of tuples
40  /// to buffer before enqueueing. Optional parameter flush_interval specifies
41  /// the interval in milliseconds between wakeups of the wakeAndFlushBuffer.
42  /// The wakeAndFlushBuffer thread is only started if buffer_size > 0.
43  StreamBaseClient(const StreamBaseURI& uri = StreamBaseURI::fromEnvironment(),
44  int buffer_size = 0,
45  int flush_interval=DEFAULT_FLUSH_INTERVAL);
46 
47  StreamBaseClient(const std::string& uris,
48  int buffer_size = 0,
49  int flush_interval=DEFAULT_FLUSH_INTERVAL);
50 
51  StreamBaseClient(const std::vector<StreamBaseURI>& uris,
52  int buffer_size = 0,
53  int flush_interval=DEFAULT_FLUSH_INTERVAL);
54 
55  StreamBaseClient(const std::vector<StreamBaseURI>& uris,
56  sb::ClientSettings &settings,
57  int buffer_size = 0,
58  int flush_interval=DEFAULT_FLUSH_INTERVAL);
59 
60  /// Destroys a session.
61  ~StreamBaseClient();
62 
63  /// Flags for the listEntities call
64  /// This must be consistent with what is found on the Java side of the
65  /// house in StreamBaseClient.java
67  /// set FULLY_QUALIFIED_NAMES if you want to
68  /// include container names for all entities. If not set then the returned
69  /// names will contain the module name (if any) and the name of the entity.
70  FULLY_QUALIFIED_NAMES = 1,
71  /// set INCLUDE_MODULES if you want to include all modules in the output
72  INCLUDE_MODULES = 2,
73  /// set ALL_CONTAINERS if you want to include all
74  /// user containers in the output. This does not include the "system"
75  /// container.
76  ALL_CONTAINERS = 4 | FULLY_QUALIFIED_NAMES
77  };
78 
79  /// Lists all entities of a particular type. (One can then use
80  /// "describe" to obtain a description of an entity.) Names is
81  /// cleared first. Use container.entity-type to resolve across containers.
82  /// use the given flags to list the entities
83  void listEntities(const std::string &type, int flags, std::vector<std::string>& names);
84 
85  /// Lists all entities of a particular type. (One can then use
86  /// "describe" to obtain a description of an entity.) names is
87  /// cleared first.
88  /// use the given flags to list the entities
89  void listEntities(StreamBaseEntityType::Type type, int flags, std::vector<std::string>& names);
90 
91  /// Lists all entities of a particular type. (One can then use
92  /// "describe" to obtain a description of an entity.) names is
93  /// cleared first.
94  void listEntities(StreamBaseEntityType::Type type, std::vector<std::string>& names);
95 
96  /// Returns an XML description of an entity, or an empty string if
97  /// the method does not exist.
98  std::string describe(const std::string &entity);
99 
100 #ifndef DOXYGEN_SKIP
101  /// Returns true if the license for the specified feature is available and false otherwise.
102  bool checkLicense(const std::string &featureName);
103 #endif
104 
105  /// Returns true if this stream has been subscribed to.
106  /// @param entity the path of the stream
107  bool isStreamSubscribed(const std::string &entity);
108 
109  /// Returns a description of a stream, throwing an exception if the
110  /// stream does not exist.
111  /// @param entity the path of the stream
112  /// @param strategy how to handle capture fields on the stream
113  StreamProperties getStreamProperties(const std::string &entity,
114  StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
115 
116  /// Returns a description of a stream, throwing an exception if the
117  /// stream does not exist.
118  StreamProperties getStreamPropertiesByHash(const std::string &hex);
119 
120  /// Returns the schema of a stream, throwing an exception if the
121  /// stream does not exist.
122  Schema getSchemaForStream(const std::string &entity,
123  StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
124 
125  /// Typechecks a potential modification to the application, outputting
126  /// properties of all streams in the application. streams is cleared
127  /// first.
128  /// @param sbapp the application
129  /// @param streams the schema defs of the streams
130  /// @param full do a full typecheck
131  void typecheck(const std::string &sbapp,
132  std::map<std::string, Schema>& streams,
133  bool full = false);
134 
135  /// Subscribes to a stream.
136  ///
137  /// Uses the default capture transform strategy of flatten. To specify
138  /// capture strategy, use getStreamProperties(stream_name,
139  /// StreamProperties::NEST) and subscribe using the resultant
140  /// StreamProperties.
141  ///
142  /// @return the StreamProperties for the stream
143  StreamProperties subscribe(const std::string &stream_name);
144  /// Subscribes to a stream.
145  /// @return the StreamProperties for the stream
146  StreamProperties subscribe(const StreamProperties &props);
147 
148  /// Subscribes to a stream with a predicate to apply to output tuples.
149  /// The stream name of dequeued tuples is logical_stream.
150  /// When unsubscribing, use logical_stream.
151  ///
152  /// Uses the default capture transform strategy of flatten. To specify
153  /// capture strategy, use getStreamProperties(stream_name,
154  /// StreamProperties::NEST) and subscribe using the resultant
155  /// StreamProperties.
156  ///
157  /// @param stream_name the stream to subscribe to, error if empty
158  /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
159  /// @param predicate a predicate to apply to subset the stream, error if empty
160  /// @return the StreamProperties for the stream
161  StreamProperties subscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
162 
163  /// Subscribes to a stream with a predicate to apply to output tuples.
164  /// The stream name of dequeued tuples is logical_stream.
165  /// When unsubscribing, use logical_stream.
166  /// @param props the stream to subscribe to, error if empty
167  /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
168  /// @param predicate a predicate to apply to subset the stream, error if empty
169  /// @return the StreamProperties for the stream
170  StreamProperties subscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
171 
172  /// Resubscribes to an already subscribed stream
173  ///
174  /// Uses the default capture transform strategy of flatten. To specify
175  /// capture strategy, use getStreamProperties(stream_name,
176  /// StreamProperties::NEST) and subscribe using the resultant
177  /// StreamProperties.
178  ///
179  /// @return the StreamProperties for the stream
180  StreamProperties resubscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
181 
182  /// Resubscribes to an already subscribed stream
183  /// @return the StreamProperties for the stream
184  StreamProperties resubscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
185 
186  /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
187  /// an unsubscribe request will be dequeued until the stream is fully drained.
188  void unsubscribe(const std::string &stream_name);
189 
190  /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
191  /// an unsubscribe request will be dequeued until the stream is fully drained.
192  void unsubscribe(const StreamProperties &props);
193 
194 #ifndef SWIG
195  ///
196  /// Set the dequeue results interceptor for this client connection.
197  ///
198  /// This results interceptor replaces any existing results processor. To
199  /// disable pre-processing of results, set the processor to null.
200  ///
201  /// This method cannot be safely called while another thread is calling
202  /// dequeue().
203  ///
204  /// NOTE: Once the DequeueResult.Interceptor is passed to StreamBaseClient,
205  /// StreamBaseClient will manage the lifecycle of the object. StreamBaseClient
206  /// will delete the object when the interceptor is reset or when the StreamBaseClient
207  /// is deleted.
208  /// Do not delete this passed object!!
209  void setDequeueResultInterceptor(DequeueResult::Interceptor *dri);
210 
211  ///
212  /// Get the current dequeue results interceptor, or null if there is no
213  /// current processor.
214  ///
215  DequeueResult::Interceptor *getDequeueResultsInterceptor();
216 
217  /// Dequeue tuples from any subscribed stream. This method
218  /// blocks until
219  ///
220  /// - at least one tuple is available on any subscribed stream, or
221  /// - the node is shut down.
222  ///
223  /// @deprecated Use DequeueResult based dequeue
224  /// @param streamProperties (output) set to the StreamProperties of the stream from
225  /// which tuples have been dequeued
226  /// @return a list of the tuples dequeued; empty if the node has been
227  /// shut down
228  TupleList dequeue(StreamProperties &streamProperties);
229 
230  /// Dequeue tuples from any subscribed stream. This method
231  /// blocks until
232  ///
233  /// - at least one tuple is available on any subscribed stream, or
234  /// - the node is shut down.
235  ///
236  /// @deprecated Use DequeueResult based dequeue
237  /// @param stream_name (output) set to the name of the stream from
238  /// which tuples have been dequeued. Note this does not contain the container name
239  /// @return a list of the tuples dequeued; empty if the node has been
240  /// shut down
241  TupleList dequeue(std::string &stream_name);
242 
243  /// Dequeue tuples from any subscribed stream. This method
244  /// blocks until
245  ///
246  /// - At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
247  /// - The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
248  /// - A network error occurs (StreamBaseException will be thrown)
249  ///
250  /// @return a DequeueResult containing the results of the operation
251  DequeueResult dequeue();
252 
253  /// Dequeue tuples from any subscribed stream. This method
254  /// blocks until
255  ///
256  /// - At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
257  /// - The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
258  /// - The number of Milliseconds specified has elapsed (DequeueResult will have a status of TIMEOUT)
259  /// - A network error occurs (StreamBaseException will be thrown)
260  ///
261  /// A timeout_ms of zero will block indefinitely, or until a tuple arrives.
262  /// Note that the actual dequeue timeout may vary based on normal thread scheduling,
263  /// plus network interactions with the server.
264  ///
265  /// @return a DequeueResult containing the results of the operation
266  DequeueResult dequeue(int timeout_ms);
267 
268  ///
269  /// Enqueue tuples to a stream. The stream must not be tied to
270  /// the output of any operator in the application. This method can block
271  /// depending on network, or StreamBase server, congestion.
272  ///
273  /// Note: enqueue() will modify the tuple as it is enqueued. If you
274  /// do not want the tuple modified you must make a copy of it before
275  /// calling enqueue.
276  ///
277  /// @deprecated Use StreamProperties based enqueue
278  /// @param stream_name the name of the stream to which to enqueue
279  /// @param tuple a tuple to enqueue
280  void enqueue(const std::string &stream_name, Tuple& tuple) ;
281 
282  ///
283  /// Enqueue tuples to a stream. The stream must not be tied to
284  /// the output of any operator in the application. This method can block
285  /// depending on network, or StreamBase server, congestion.
286  ///
287  /// Note: enqueue() will modify the tuple as it is enqueued. If you
288  /// do not want the tuple modified you must make a copy of it before
289  /// calling enqueue.
290  ///
291  /// @param props the StreamProperties to enqueue the tuple to
292  /// @param tuple a tuple to enqueue
293  void enqueue(const StreamProperties &props, Tuple& tuple) ;
294 
295  /// Enqueue tuples to a stream. The stream must not be tied to
296  /// the output of any operator in the application. This method can block
297  /// depending on network, or StreamBase server, congestion.
298  ///
299  /// Note: enqueue() will modify the tuples as they are enqueued. If you
300  /// do not want the tuples modified you must make copies before
301  /// calling enqueue.
302  ///
303  /// @deprecated Use StreamProperties based enqueue
304  /// @param stream_name the name of the stream to which to enqueue
305  /// @param tuples a list of tuples to enqueue
306  void enqueue(const std::string &stream_name, TupleList& tuples) ;
307 
308  /// Enqueue tuples to a stream. The stream must not be tied to
309  /// the output of any operator in the application. This method can block
310  /// depending on network, or StreamBase server, congestion.
311  ///
312  /// Note: enqueue() will modify the tuples as they are enqueued. If you
313  /// do not want the tuples modified you must make copies before
314  /// calling enqueue.
315  ///
316  /// @param props the StreamProperties to enqueue the tuples to
317  /// @param tuples a list of tuples to enqueue
318  void enqueue(const StreamProperties &props, TupleList& tuples) ;
319 #endif
320 
321  /// Returns a schema with a particular hash.
322  Schema getSchemaByHash(const std::string &hash);
323 
324  /// Returns a schema by name. This will only succeed for named schemas;
325  /// anonymous schemas assigned to a port should instead be looked up
326  /// via getStreamProperties().getSchema().
327  Schema getSchemaByName(const std::string &name);
328 
329  /// status of java operators and adapters
330  void operatorStatus(const std::string &containerName, std::vector<std::string> &aStatus);
331 
332  /// status the streambase daemons
333  void status(std::vector<std::string> &aStatus, bool verbose = false);
334 
335  /// Return the URI we're talking to.
336  const StreamBaseURI& getURI() const;
337 
338  const std::vector<StreamBaseURI>& getURIs() const;
339 
340  /// Terminate the client. May be invoked from a different thread.
341  /// StreamBaseClient memory, network, and thread resources are not released
342  /// until close() is called.
343  ///
344  /// Returns immediately.
345  void close();
346 
347  /// Return true if the client connection is closed.
348  /// The client connection can be closed by calling the close() method,
349  /// or by a server shutdown, or a network error.
350  bool isClosed();
351 
352 
353  /// Return true if we can call dequeue without blocking. This means
354  /// that there is something to dequeue from the server. This dequeued item
355  /// could be Tuples, a null (server shutdown) or an exception.
356  /// @return boolean if we can dequeue without blocking
357  bool canDequeue();
358 
359  /// Enable heartbeating for this client. Generally this is only for enqueue-only clients.
360  void enableHeartbeating();
361 
362 
363  /// Returns true if this TupleConnections has any live connections to a
364  /// server. Note that a return of "false" doesn't necessarily indicate an
365  /// error since the TupleConnection's state (e.g., lack of subscriptions)
366  /// might mean no connections are needed.
367  /// @return boolean if we are connected
368  bool isConnected();
369 
370  /// Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0.
371  void enableBuffering(int buffer_size, int flush_interval = DEFAULT_FLUSH_INTERVAL);
372 
373 #ifndef DOXYGEN_SKIP
374  /// Enable connection less enqueue. This enables a mode where tuples are enqueued
375  /// over an XMLRPC request rather than a binary connection. It is useful for low
376  /// volume enqueue of tuples. It may reduce the load on the server as the connection
377  /// is transient. Setting has no effect upon dequeue.
378  /// @param enable enable connection less enqueue of tuples. Default is off.
379  /// excluded until server performance improves
380  /// <em>Note: this method is not public API, and is for internal StreamBase use only</em>
381  void enableConnectionlessEnqueue(bool enable);
382 #endif
383 
384 #ifndef DOXYGEN_SKIP
385  /// Return state of connection less enqueue setting
386  /// excluded until server performance improves
387  /// <em>Note: this method is not public API, and is for internal StreamBase use only</em>
388  bool getIsConnectionlessEnqueue();
389 #endif
390 
391  /// Flush any pending enqueue buffers.
392  /// This operation has no effect if buffering is not enabled.
393  ///
394  /// @throws StreamBaseException if there is an IO error while flushing the buffer
395  void flushAllBuffers();
396 
397  /// Flush any pending enqueue buffer for the StreamProperties provided.
398  /// This operation has no effect if buffering is not enabled or
399  /// there is no buffer to flush for the given stream.
400  /// <br/>
401  /// <b>Note:</b> Note that this will cause inter-stream ordering to be interrupted.
402  /// <br/>
403  /// @param props the stream whose enqueue buffers to flush, if not empty
404  /// @throws StreamBaseException if there is an IO error while flushing the buffer
405  /// @deprecated use flushAllBuffers() to preserve inter-stream ordering
406  void flushBuffer(const StreamProperties &props);
407 
408  /// Flush any pending enqueue buffer for the stream name provided.
409  /// This operation has no effect if buffering is not enabled or
410  /// there is no buffer to flush for the given stream.
411  /// <br/>
412  /// <b>Note:</b> Note that this will cause inter-stream ordering to be interrupted.
413  /// <br/>
414  /// @param stream_name the stream whose enqueue buffers to flush, if not empty
415  /// @throws StreamBaseException if there is an IO error while flushing the buffer
416  /// @deprecated stream_name use StreamProperties based flushBuffer
417  /// @deprecated use flushAllBuffers() to preserve inter-stream ordering
418  void flushBuffer(const std::string &stream_name);
419 
420  /// Return whether buffering is turned on or off
421  bool getIsBuffering() const;
422 
423  /// Return buffer size (in tuples)
424  int getBufferSize() const;
425 
426  /// Return the Connection ID for this Client Connection. Only Valid once
427  /// an enqueue/dequeue has been attempted.
428  /// @returns connection ID in binary format
429  std::string getConnectionID() const;
430 
431  /// Get all the dynamic variables in the given module, and return them
432  /// as a Tuple where the field names are the names of the dynamic variables,
433  /// and the field values are the current values of the dynamic variables.
434  ///
435  /// @throws StreamBaseException on network or other errors, or if there are
436  /// no dynamic variables in the named module
437  Tuple getDynamicVariables(const std::string& modulePath);
438 
439  /// Set the dynamic variable at the given path to the given value, expressed
440  /// as a string in CSV style.
441  /// @throws StreamBaseException if the dynamic variable does not exist, or the
442  /// value is not appropriate for setting the dynamic variable
443  void setDynamicVariable(const std::string& dynamicVariablePath, const std::string& value);
444 
445  /// Return the Connection ID for this Client Connection. Only Valid once
446  /// an enqueue/dequeue has been attempted.
447  /// @returns connection ID in Hexadecimal format
448  std::string getConnectionIdAsHexString() const;
449 
450  /// Read rows out of the table at this path. Limit to the
451  /// specified number of rows returned, or -1 for no limit. Filter
452  /// rows according to predicate, or return all rows if predicate
453  /// is empty.
454  /// @returns a list of matching rows.
455  TupleList readTable(const std::string &tablePath, int limit, const std::string& predicate="");
456 
457 #ifndef DOXYGEN_SKIP
458  /// Execute an internal command. Internal commands are subject to change and
459  /// for StreamBase internal use only.
460  void internalCommand(const std::vector<std::string>& arguments, std::vector<std::string> &result);
461 
462  bool doByteSwap();
463 #endif
464 
465  /// Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart
466  /// beat from the StreamBase server that it is connected to. The default value is
467  /// 120 seconds (120000). By default, StreamBase servers emit "client heart beats"
468  /// every 10 seconds so StreamBase applications have no requirement to send data regularly.
469  ///
470  /// @param timeoutMS The number of milliseconds that is allowed to pass without
471  /// receiving a message from the StreamBase server client heart beat stream.
472  int setQuiescentLimit(int timeoutMS);
473 
474  /// Returns the client version as a string
475  std::string getVersion() {
476  return std::string(StreamBaseVersion::INFO_LINE);
477  }
478 
479  ///
480  /// Get the settings for the client
481  ///
483  return _clientSettings;
484  }
485 
486  /// Returns the number of tuples this client has actually enqueued to the server.
487  /// Note that this number will not include any tuples that have been buffered but
488  /// have not actually been enqueued to the server.
489  long long getTupleEnqueueCount() const;
490 
491  /// Returns the number of tuples this client has dequeued from the server. This number
492  /// does not include tuples that may be dequeued as part of system services such
493  /// as heartbeating. This number does however include tuples that have been dequeued
494  /// but have been intercepted by the Interceptor.
495  long long getTupleDequeueCount() const;
496 
497 private:
498  typedef std::map<std::string, StreamProperties> StreamMap;
499 
500  TIBCO_CEP_memory::shared_ptr<sb_internal::StreamBaseClientImpl> _impl;
501 
502  // No copying.
503  StreamBaseClient(const StreamBaseClient&);
504 
505  // No copying.
506  StreamBaseClient& operator = (const StreamBaseClient&);
507 
508  void init();
509 
510  // keep track of logical and associated real streams
511  StreamMap _streamMap;
512 
513  sb::ClientSettings _clientSettings;
514 };
515 
516 
517 SB_NAMESPACE_END;
518 #endif