Contents
This topic describes how to use the TIBCO StreamBase® for Apache Cassandra operators to interact with a Apache Cassandra database cluster, and explains how to configure the operators' Properties views.
The Apache Cassandra connectivity solution is implemented as a suite of six global Java operators that allows a StreamBase application to connect to a Cassandra database cluster and access its data.
Apache Cassandra is a distributed database management system designed to handle large amounts of data across many commodity servers in a high availability, clustered environment.
In addition to the four operators dedicated to insert, delete, update and select operations, a more generic Query operator allows any Cassandra Query Language (CQL) query to be executed and a Control operator directly controls connection and disconnection operations on the cluster. The operation of all six operators is described in this document.
The Cassandra operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:
-
Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.
-
Click in the canvas where you want to place the operator, and invoke the keyboard shortcut
O V
-
From the top-level menu, invoke
> > .
From the Insert an Operator or Adapter dialog, select one of the following Cassandra-related operators and double-click or press :
-
Apache Cassandra Insert, which adds a row to the cluster.
-
Apache Cassandra Delete, which removes a row from the cluster.
-
Apache Cassandra Update, which modifies an existing row in the cluster.
-
Apache Cassandra Select, which retrieves rows from the cluster.
-
Apache Cassandra Query, which can be used to send arbitrary queries to the cluster.
-
Apache Cassandra Control, which can be used to connect to the cluster, disconnect from it or obtain the current connection state.
In order to run, the operators assume the following to be properly set up:
-
The machine running your StreamBase application must have a copy of Apache Cassandra v3.1 or later installed.
-
At runtime, the operators expect the configured Cassandra Cluster to be running and ready to accept connections at the configured URLs. Configuration of your Cassandra operators is discussed in the next section.
The different Cassandra operators share a connection to the same cluster, provided they are configured to do so. Each operator
lists the clusters available in the a combo box (see Properties: Operator Properties Tab). The list's values are specified in a dedicated section of the application's
file. Here is an example of such a section, containing all supported settings (long lines wrap to the next, for clarity):
Cassandra.conf
name = "Cassandra.conf" type = "com.tibco.ep.streambase.configuration.adapter" version = "1.0.0" configuration = { // An adapter group type defines a collection of EventFlow adapter configurations, indexed // by adapter type. AdapterGroup = { // A collection of EventFlow adapter configurations, indexed by adapter type. // This object is required and must contain at least one configuration. adapters = { // The root section for an EventFlow adapter configuration. cassandra = { // Section list. This array is optional and has no default value. sections = [ // A configuration for an EventFlow adapter named section. { // Section name. The value does not have to be unique; that is, you can have multiple // sections with the same name in the same array of sections. This object is required. name = "cluster-definition" // Section for setting adapter properties. All values must be strings. This object is // optional and has no default value. settings = { contact-points = "localhost" id = "Test Cluster" port = "9042" } } // One of "RoundRobinPolicy", "DCAwareRoundRobinPolicy", "TokenAwarePolicy", // "LatencyAwarePolicy", "WhitelistPolicy" // // DCAwarePolicy parameters: // local-dc (string) // used-hosts-per-remote-dc (string) // allow-remote-dc-for-local-consistency-level (boolean) // // TokenAwarePolicy parameters: // shuffle-replicas (boolean) // // WhitelistPolicy parameters: // whitelist-hosts (string of the form "host:port{;host:port...}") // // LatencyAwarePolicy parameters: // with-exclusion-threshold (double) // with-minimum-measurements (int) // with-scale (long) // with-scale-units (one of "milliseconds", "seconds", "minutes", // "hours", "days") // with-retry-period (long) // with-retry-period-units (one of "milliseconds", "seconds", // "minutes", "hours", "days") // with-update-rate (long) // with-update-rate-units (one of "milliseconds", "seconds", // "minutes", "hours", "days") // "load-balancing-policy" = "TokenAwarePolicy" // "shuffle-replicas" = "true" // One of "RoundRobinPolicy", "DCAwareRoundRobinPolicy", parameters as above // "load-balancing-policy" = "DCAwareRoundRobinPolicy" // "local-dc" = "MyLocalDC" // // QueryOptions settings // // One of ALL, ANY, EACH_QUORUM, LOCAL_ONE, LOCAL_QUORUM, LOCAL_SERIAL, ONE, QUORUM, // SERIAL, THREE, TWO // "consistency-level" = "ONE" // "serial-consistency-level" = "ONE" // "default-idempotence" = "false" // "fetch-size" = "5000" // "max-pending-refresh-node-list-requests" val="20" // "max-pending-refresh-node-requests" = "20" // "max-pending-refresh-schema-requests" = "20" // "prepare-on-all-hosts" = "true" // "refresh-node-interval-millis" = "1000" // "refresh-node-list-interval-millis" = "1000" // "refresh-schema-interval-millis" = "1000" // "reprepare-on-up" = "true" // // SSL. Passwords may be enciphered using the epadmin encrypt secret command // "use-ssl" = "false" // "ssl-key-store" = "" // "ssl-key-store-pwd" = "" // "ssl-trust-store" = "" // "ssl-trust-store-pwd" = "" ] } } } }
A best practice is to define your clusters before placing operator instances on the canvas, so that the lists are already available in the Properties view and the operators can be configured right away.
This section describes the properties you can set for the each of the six Cassandra operators, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Operator: A read-only field that shows the formal name of the operator.
Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
This section describes the properties on the Operator Properties tab in the Properties view for the Cassandra operators. Enter all text fields as string literals, not as expressions.
All Cassandra operators have a common set of properties:
- Cassandra Configuration
-
The Editor, used for adapter configuration or converting an existing application's
button is a shortcut to the StreamBase Configuration Fileadapter-configurations.xml
file to HOCON format. - Cluster Definition
-
Specifies the name of the Cassandra Cluster to which to connect. The drop-down list contains a list of available clusters from which to choose, as defined in the HOCON configuration file (see Configuration). This setting is required.
- Connect On Startup
-
Specifies whether to automatically attempt a connection to the cluster when the application start.
- Reconnect Interval (in ms)
-
Specifies the number of milliseconds to wait between reconnection attempts.
- Log Level
-
Use this to set the operator to produce more or less verbose console output, independent of the
STREAMBASE_LOG_LEVEL
global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.
In addition to the common properties listed above, some operators also have some properties of their own:
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- Order By
-
If set, this adds an
ORDER BY
clause to the query with the given value. - Order
-
Either
ASC
orDESC
, which refers to the ordering direction of theORDER BY
clause. - Limit
-
If non-zero, this adds a
LIMIT
clause to the query. - Allow Filtering
-
If set, this adds
ALLOW FILTERING
to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- If Not Exists
-
If set, adds an
IF NOT EXISTS
clause to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- If Exists
-
If set, adds an
IF EXISTS
clause to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.
Each operator has exactly one input and one output port for emitting results, plus an optional Status port.
Note
The only exception to this is the Control operator, which has no results output port and hence no Edit Schema property tab.
Each operator's results output port has two fields: one named query
containing the original query that triggered the current output, and one tuple field named row
which contains the one row in the result set obtained when executing the query. The schema for this row field is determined
by the schema specified on the Edit Schema tab. The fields in this schema are matched with column names in the rows from the result set and the values will be mapped
accordingly. Any StreamBase field in the schema not also present in the row is ignored; similarly any column in the row that
has no corresponding StreamBase field is ignored. Each StreamBase field that does have a corresponding Cassandra column must
be of a data type that is compatible with the Cassandra column's data type (for example, Cassandra columns of type text
should have a corresponding StreamBase field of type string
). The list of compatible types between StreamBase and Cassandra is specified in Type Mappings.
Use the settings in this tab to allow this operator or adapter to start and stop based on conditions that occur at runtime in a cluster with more than one node. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with TIBCO Streaming releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
Each operator has only one input port, used to receive commands describing an operation to be performed on the Cassandra cluster (such as SELECT, INSERT, UPDATE, and so on). The schema for each operator is different because different operations require different parameters.
In all cases however, any fields found in the input schema that do not match those described below are ignored and passed
through directly in the input
field of result output tuples.
Field Name | Field Type | Description |
---|---|---|
query | string | REQUIRED. Describes the full Cassandra query to execute. For example, CREATE KEYSPACE myks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1} |
Each field in the input schema for the Select operator is used to fill out a query of the form:
SELECT <field-list-1> FROM <keyspace>.<table> WHERE <field-list-2> [ORDER BY <orderby-value> ASC|DESC] [LIMIT <limitval-value>]
[ALLOW FILTERING]
In the example above, field-list-1
is a comma-separated list of the fields specified in the Edit Schema tab of the operator, and orderby
, ASC
|DESC
, limitval
and ALLOW FILTERING
all specified according to their values in the operator's Properties tab. Finally, field-list-2
is filled using the contents of a field named WHERE
in the input tuple.
If a field of the WHERE tuple is of StreamBase type LIST
instead of a simple type, its translation in the WHERE clause will be "column IN (value[,value...])" instead of "column =
value".
The fields of the input tuple will be used to fill out an INSERT
query of the form:
INSERT INTO <keyspace>.<table> (<field-list>) VALUE (<values-list>) [USING [TTL ttl-value] [AND TIMESTAMP timestamp-value]]
[IF NOT EXISTS]
Field Name | Field Type | Description |
---|---|---|
values | tuple | REQUIRED. Contains the fields and values to insert into the INSERT query. Each field's name is used to fill out the field-list in the query shown above, and each value is used to fill out values-list .
|
ttl | int | If set, a USING TTL ttl-value clause is added to the query.
|
values | timestamp | If set, a USING TIMESTAMP timestamp-value clause is added to the query.
|
Finally, if the operator's If Not Exists
property is set, an IF NOT EXISTS
clause is added to the query.
The fields of the input tuple are used to fill out a DELETE
query of the form:
DELETE FROM <keyspace>.<table> WHERE column=value [, column=value] [IF [condition-list | EXISTS]]
Field Name | Field Type | Description |
---|---|---|
where | tuple | REQUIRED. Contains the fields and values to insert into the DELETE query. Each field's name is used to fill out the column names in the query shown above, and each value is used to fill out the values. |
if | tuple | If non-null, an IF query is added to the query and this tuple's fields is used to fill out the condition-list as shown above.
|
Finally, if the operator's If Exists
property is set, an IF EXISTS
clause is added to the query. This setting is ignored if the if
field is non-null in the input tuple.
The fields of the input tuple are used to fill out an UPDATE
query of the form:
UPDATE <keyspace>.<table> SET column=value [, column=value] WHERE column=value [, column=value] [IF condition-list]
Field Name | Field Type | Description |
---|---|---|
set | tuple | REQUIRED. This tuple field contains fields that will be used in the SET clause for this UPDATE statement.
|
where | tuple | REQUIRED. This tuple field contains fields that will be used in the WHERE clause for this UPDATE statement.
|
if | tuple | If non-null, an IF query is added to the query and this tuple's fields will be used to fill out the condition-list as shown above.
|
Every operator has only one fixed output port, used to deliver the results of executing operator commands, plus one optional Status output port if the operator's Enable Status Port option is checked.
The Results Port consists of one field named input
that mirrors the operator's input schema, plus one tuple field named row
whose schema matches that specified in the Edit Schema tab of the adapter.
If the ResultSet returned by executing a query contains one or more rows, those are emitted separately in successive tuples
on the results output port. The query
field contains the initial input that triggered this query, and the row
field contains one row in the result set. In any case, one additional tuple are emitted with its row
field set to null to indicate the results have been fully processed.
The Status Port, when enabled, is the same for each operator.
Field Name | Field Type | Description |
---|---|---|
status | string | Describes this event. Possible values include: Connected , Disconnected , Error .
|
info | list<string> | This is an all-purpose field used to convey additional information describing this event. For example, in the case of an Error event this will contain the text describing the error.
|
context | tuple | If this event was generated as a result of a command being sent to the operator, this field will contain the original command tuple. |
When exchanging tuples between StreamBase and Cassandra, the fields and field types of these tuples are expected to match in the following ways:
-
The fields have the same names, and
-
Data types match or can be readily converted.
The table below lists all the type translations supported by the operator.
StreamBase Type | Cassandra Type(s) |
---|---|
boolean | boolean |
string | text, ascii, inet, uuid, varchar |
int | cint, smallint, tinyint
When mapping a StreamBase int to a Cassandra smallint or tinyint, some precision may be lost. |
long | bigint, cint, smallint, tinyint, time, counter
When mapping a StreamBase long to a Cassandra cint, smallint or tinyint, some precision may be lost. |
double | double, float
When mapping a StreamBase double to a Cassandra float, some precision may be lost. |
timestamp | date, timestamp |
tuple | tuple |
blob | blob |
list | list, set, map
When mapping a StreamBase list to a Cassandra set, any duplicates in the list will be removed (and the last value of duplicate entries will be used). When mapping a StreamBase list to a Cassandra map, the list elements are expected to be tuples with the first field representing an entry's key the second field representing its value. |
function | text, blob
The field will be mapped to a JSON string representing its value if the Cassandra field is of type text, or serialized into a byte array if the field is of type blob. |
capture | Not supported. |
The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select Extending StreamBase section for an entry called Cassandra Operator.
> and look under the