Contents
The Adapter for RabbitMQ Consumer allows the system to consume data from a RabbitMQ broker. The connection to the broker can be shared across multiple RabbitMQ adapters and each adapter creates its own channel on the connection. A consumer can create an exchange and queue or use an existing exchange or queue. Each message received by the adapter from the broker will produce a tuple with the message as well as the envelope and the properties of that message.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.
Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
| Property | Type | Description | 
|---|---|---|
| RabbitMQ Configuration | Edit Button | Shortcut to the StreamBase Configuration File Editor, used for adapter configuration or converting an existing application's adapter-configurations.xmlfile to HOCON format. | 
| RabbitMQ Server | drop-down list | A list of RabbitMQ server connections that were specified in the adapter-configurations.xmlfile. See the Adapter File Configuration section of this document for RabbitMQ connection options. | 
| Create Channel On Connect | check box | When enabled a channel with the properties giving at design time is created. | 
| Qos | integer | When a positive integer value is supplied it is used as the channel's quality of service value. A value of 0 means unlimited. | 
| Qos Global | check box | When enabled the Qos setting is applied to the entire channel rather than each consumer. | 
| Enable Auto Acknowledge | check box | When enabled each message is considered acknowledged once delivered. When not enabled each message must be specifically acknowledged using the RabbitMQ Ack adapter. | 
| Message Data Type | drop-down list | The type of data that is stored in the message. This data type affects the output schema. If custom or tuple is selected, the edit schema section must contain a valid schema. If custom is selected a valid class name must be provided that implements IRabbitMQDeserializer. Available values are: String, Tuple, Blob, Custom. | 
| Message Deserializer Class | string | The class that implements the IRabbitMQDeserializer and is instantiated to convert message bytes to the resulting message
                                    schema. NoteSee the section on custom serialization in this document. | 
| Enable Control Port | check box | When enabled a control port allows commands to be sent to the operator to perform actions during runtime. | 
| Enable Status Port | check box | When enabled a status port is available that emits status tuples for various events of this operator. | 
| Log Level | INFO | Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. | 
| Property | Type | Description | 
|---|---|---|
| Name | string | The name of the exchange to connect to or create when Createchannel On Connectis enabled. If this value is blank the default exchange is used. This value cannot be blank if declare exchange is enabled. | 
| Declare Exchange | check box | When enabled the system tries to declare this exchange on the server. This option is only valid when Createchannel On Connectis enabled. If enabled theNameproperty must be specified. If any other adapter is also set to declare an exchange and has the same name, the declare options
                                    must be identical or a runtime error will occur on startup. It is suggested that only on adapter set the declare flag and
                                    other adapters simply use the exchange name. | 
| Type | drop-down list | The type of exchange to create. This option is only valid when Createchannel On Connectis enabled. See the RabbitMQ documentation for descriptions of the exchange types. | 
| Use Defaults | check box | When checked the exchange is created as a non-autodelete, non-durable exchange with no extra arguments. | 
| Is Durable | check box | When enabled the exchange is set to durable when created. | 
| Is Auto Delete | check box | When enabled the exchange is set to deleted when not used. | 
| Exchange Arguments | key value pair | The arguments to use when creating an exchange. | 
| Property | Type | Description | 
|---|---|---|
| Name | string | The name of the queue to connect to or create if Createchannel On Connectis enabled. If this value is blank and declare queue is enabled a temporary queue is created. The name of the queue can be
                                    received from the status port when action is queue. | 
| Declare Queue | check box | When enabled the system tries to declare this queue on the server. This option is only valid when Createchannel On Connectis enabled. If any other adapter is also set to declare a queue and has the same name, the declare options must be identical
                                    or a runtime error occurs on startup. It is suggested that only on adapter set the declare flag and other adapters simply
                                    use the queue name. | 
| Is Durable | check box | When enabled the exchange is set to durable when created. | 
| Is Auto Delete | check box | When enabled the exchange is set to deleted when not used. | 
| Is Exclusive | check box | When enabled the queue is set to exclusive. | 
| Queue Arguments | key value pair | The arguments to use when creating an queue. | 
| Property | Type | Description | 
|---|---|---|
| Routing Keys | key value | The routing keys to use which are used differently depending on the type of exchange that is currently connected. See the RabbitMQ documentation for information about routing keys. | 
Use the Edit Schema tab to specify the schema of the output tuple for this adapter. For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.
Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
This example configuration shows a basic configuration. You can have as many configurations as your application requires, but each must have a unique ID.
name = "RabbitMQ.conf"
        type = "com.tibco.ep.streambase.configuration.adapter"
        version = "1.0.0"
        configuration = {
        
        // An adapter group type defines a collection of EventFlow adapter configurations, indexed by adapter type.
            AdapterGroup = {
        
        // A collection of EventFlow adapter configurations, indexed by adapter type. This key is required and must contain at
        // least one configuration.
                adapters = {
        
        // The root section for an EventFlow adapter configuration.
                    RabbitMQServers = {
        
        // Section list. This key is optional and has no default value.
                        sections = [
        
        // A configuration for an EventFlow adapter named section.
                            {
        
        // Section name. The value does not have to be unique; that is, you can have multiple sections with the same name
        // in the same array of sections. This key is required.
                                name = "RabbitMQServer"
        
        // Section property bag. All values must be strings. This key is optional and has no default value.
                                settings = {
                                    ConnectOnStartup = "true"
                                    Host = "127.0.0.1"
                                    Password = ""
                                    Username = ""
                                    id = "RabbitMQ"
                                    VirtualHost = ""
                                    Port = ""
                                    AutomaticRecoveryEnabled = ""
                                    NetworkRecoveryInterval = ""
                                    ConnectionTimeout = ""
                                    HandshakeTimeout = ""
                                    ShutdownTimeout = ""
                                    RequestedHeartbeat = ""
                                    TopologyRecoveryEnabled = ""
                              //SSL Settings
                                    sslEnabled = false
                                    sslHostnameVerification = false
                                    sslKeyStorePassword = ""
                                    sslKeyStoreType = "PKCS12"
                                    sslKeyStoreFile = ""
                                    sslKeyStoreManagerType = "SunX509"
                                    sslTrustStorePassword = ""
                                    sslTrustStoreType = "JKS"
                                    sslTrustStoreFile = ""
                                    sslTrustStoreManagerType = "SunX509"
                                    sslContextType = "TLSv1.2"
                                    }
                                }
                            ]
                        }
                    }
                }
            }
        | Setting | Type | Required | Description | 
|---|---|---|---|
| id | string | true | The value to display in the drop down list and is used to key to this section of the configuration file. | 
| ConnectOnStartup | true/false | true | Connect to the RabbitMQ broker on startup. | 
| Host | string | true | The host address of the RabbitMQ Server. | 
| Username | string | false | The user name to use when connection to the RabbitMQ Server. | 
| Password | string | false | The password to use when connection to the RabbitMQ Server. | 
| VirtualHost | string | false | The virtual host to use when connection to the RabbitMQ Server. | 
| Port | integer | false | The target port to use when connection to the RabbitMQ Server. | 
| AutomaticRecoveryEnabled | true/false | false | Enables or disables automatic connection recovery. | 
| NetworkRecoveryInterval | long | false | Sets connection recovery interval. Default is 5000 milliseconds. | 
| ConnectionTimeout | integer | false | Set the TCP connection timeout in milliseconds; zero for infinite. | 
| HandshakeTimeout | integer | false | Set the AMQP0-9-1 protocol handshake timeout, in milliseconds. | 
| ShutdownTimeout | integer | false | Set the shutdown timeout in milliseconds; zero for infinite; default 10000. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. | 
| RequestedHeartbeat | integer | false | Set the requested heartbeat timeout, in seconds; zero for none. Heartbeat frames will be sent at about 1/2 the timeout interval. | 
| TopologyRecoveryEnabled | true/false | false | Enables or disables topology recovery. | 
| sslEnabled | true/false | false | Enables SSL for the connection. | 
| sslHostnameVerification | true/false | false | Enable server hostname verification for TLS connections. | 
| sslKeyStorePassword | string | The keystore password. This value can be plain text or a value encoded using sbcipher. | |
| sslKeyStoreType | string | PKCS12 | The type of keystore. See the KeyStore section in the Java Cryptography Architecture Standard Algorithm Name Documentation for information about standard keystore types. | 
| sslKeyStoreFile | string | The full path to the keystore file location. | |
| sslKeyStoreManagerType | string | SunX509 | The standard name of the requested algorithm. See the Java Secure Socket Extension Reference Guide for information about standard algorithm names. | 
| sslTrustStorePassword | string | The trust store password. This value can be plain text or a value encoded using sbcipher. | |
| sslTrustStoreType | string | JKS | The type of trust store. See the KeyStore section in the Java Cryptography Architecture Standard Algorithm Name Documentation for information about standard keystore types. | 
| sslTrustStoreFile | string | The full path to the trust store file location. | |
| sslTrustStoreManagerType | string | SunX509 | the standard name of the requested trust management algorithm. See the Java Secure Socket Extension Reference Guide for information about standard algorithm names. | 
| sslContextType | string | TLSv1.2 | the standard name of the requested protocol.See the SSLContext section in the Java Cryptography Architecture Standard Algorithm NameDocumentation for information about standard protocol names. | 
- 
                              command, string. The command to send to the adapter. The values are: - 
                                       connect — Connect to the RabbitMQ server. 
- 
                                       disconnect — Disconnect from the RabbitMQ server. 
- 
                                       createchannel — Create a channel with the given exchange and queue information. 
- 
                                       removechannel — Remove a channel using channel number supplied. 
 
- 
                                       
- 
                              exchangeName, string. Used with createchannel command, determines the name of the exchange to create or use. 
- 
                              exchangeDeclare, boolean. Used with createchannel command to determine whether an exchange should be declared. 
- 
                              exchangeDefaults, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange with RabbitMQ defaults. 
- 
                              exchangeType, string, Used with createchannel command and exchangeDeclare is true. Determines the type of exchange to create, valid values are: - 
                                       direct 
- 
                                       topic 
- 
                                       fanout 
- 
                                       headers 
 
- 
                                       
- 
                              exchangeDurable, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange as durable. 
- 
                              exchangeAutoDelete, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange as auto-delete. 
- 
                              exchangeArgs. list<tuple<string key, string value>>. Used with createchannel command and exchangeDeclare is true. A list of key value pairs to be used as the exchange arguments. 
- 
                              queueName, string. Used with createchannel command. Determines the name of the queue to create or use. 
- 
                              queueDeclare, boolean. Used with createchannel command to determine whether an queue should be declared. 
- 
                              queueDurable, boolean. Used with createchannel command and queueDeclare is true. If true, declares the queue as durable. 
- 
                              queueExclusive, boolean. Used with createchannel command and queueDeclare is true. If true declares the queue as exclusive. 
- 
                              queueAutoDelete, boolean. Used with createchannel command and queueDeclare is true. If true declares the queue as auto-delete. 
- 
                              queueArgs, list<tuple<string key, string value>>. Used with createchannelcommand and queueDeclare is true. A list of key value pairs to be used as the queue arguments. 
- 
                              routingKeys, list<string>. Used with createchannel command, a list of routing keys. 
- 
                              channel, integer. Used with removechannel command, the channel ID to remove. The channel ID is output as the object field of the status port when action equals channel. 
The status port is used to send status information tuples downstream to inform the user of changes.
- 
                              type, string. The type of status information emitted on this port. Status types are: - 
                                       Error — Indicates this message is related to an error that occurred. 
- 
                                       Warn — Indicates this message is related to a warning that the user should be aware of. 
- 
                                       Info — Indicates this message is related to extra status information. 
 
- 
                                       
- 
                              action, string. - 
                                       Connected — Connected to requested broker list. 
- 
                                       Connecting — Starting to connect 
- 
                                       Disconnecting — Disconnecting from. 
- 
                                       Connection — Information about the current connection 
- 
                                       Update Brokers — Information about updating the brokers 
- 
                                       Command — A user command 
- 
                                       Send — Trying to send a message 
 
- 
                                       
- 
                              object, string. This value may be null. If it is not null, it contains a value relevant to the status message. 
- 
                              message, string. This is a formatted human readable message that explains the status message. 
- 
                              time, timestamp. The time the status message occurred. 
When custom is selected for the message type you will need to provide a class which implements the com.streambase.sb.adapter.rabbitmq.IRabbitMQDeserializer interface. Here is an example:
                  
package com.streambase.rabbitmq.sample;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.streambase.sb.Schema;
import com.streambase.sb.Tuple;
import com.streambase.sb.TupleJSONUtil;
import com.streambase.sb.adapter.rabbitmq.IRabbitMQDeserializer;
public class CustomDeserializer implements IRabbitMQDeserializer {
  @Override
  public Tuple deseralize(Schema schema, String consumerTag, Envelope envelope, 
    BasicProperties properties, byte[] body) throws Exception {
      Tuple tuple = schema.createTuple();
      TupleJSONUtil.setTupleFromJSONLoose(tuple, new String(body), "");            
      return tuple;
  }
}