MQTT Client Publish Adapter

Introduction

The TIBCO StreamBase® Output Adapter for MQTT Client Publish allows the system to publish data to an MQTT broker. Each message must contain the topic that the message will be sent to and the payload in binary or string form. The connection settings for this adapter are stored in the HOCON file.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Type Description
MQTT Configuration Edit Button Shortcut to the StreamBase Configuration File Editor, used for adapter configuration or converting an existing application's adapter-configurations.xml file to HOCON format.
MQTT Connection drop-down list This required field will select a value from the adapter configuration file that contains the connection configuration for the MQTT client. This value is selected from the adapter-configuration section with name equal to mqttclientconnections, section name equal to mqttclientconnection, the value is pulled from the id parameter.
Enable Control Port check box When enabled the adapter allows incoming tuples to control the actions of this adapter.
Command Field Name string The name of the field in the incoming control tuple that contains the command.
Enable Status Port check box When enabled, the adapter sends out informational data on the status port about various states of the adapter.
Enable Published Status Messages check box When enabled, the adapter sends out a status message when the message is published to the message queue.
Enable Delivery Complete Status Messages check box When enabled, the adapter sends out a status message when the message delivery is complete.
Topic Field Name string The name of the field in the incoming tuple that contains the topic.
Payload Field Name string or blob The name of the field in the incoming tuple that contains the payload.
Qos Field Name string Optional: The name of the field in the incoming tuple that contains the QoS (quality of service) setting.
Retailed Field Name string Optional: The name of the field in the incoming tuple that contains the retained flag.
Enable Pending Delivery Tokens Port check box When enabled, the adapter sends out pending delivery token informational data on the pending delivery port when requested.
Log Level INFO Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Data Port

Description

Use the data port to send published data to the adapter.

Data Port Schema

  • Topic, string, The topic to publish to.

  • Payload, string/blob, The data to publish.

  • Qos, int, Optional: The QoS level (0, 1, or 2) for this message.

  • Retained, boolean, Optional: The retained flag when publishing this message.

Control Port

Description

Use the control port to send action commands to the adapter.

Control Port Schema

  • Command, string. The command to send to the adapter.

    The values are:

    • Connect — Connect to the MQTT broker.

    • Reconnect — Reconnected to the MQTT broker. This occurs when the automaticReconnect flag is set to true and an existing connection gets disconnected and reconnects.

    • Disconnect — Disconnect from the MQTT broker.

    • PendingDeliveryTokens — Fetch all pending delivery tokens and output them on the pending delivery port (port must be enabled).

  • Advance (optional), tuple. The settings to overwrite the default configurations.

Connection Configuration

All settings are in the format name val pair

The connection configuration is stored in the project's HOCON file. The following is an example of a complete configuration. Long lines wrap to the next line for clarity.

          name = "MQTT.conf"
          type = "com.tibco.ep.streambase.configuration.adapter"
          version = "1.0.0"
          configuration = {
          
          // An adapter group type defines a collection of EventFlow adapter configurations, 
          // indexed by adapter type.
            AdapterGroup = {
          
          // A collection of EventFlow adapter configurations, indexed by adapter type.
          // This key is required and must contain at least one configuration.
                adapters = {
          
          // The root section for an EventFlow adapter configuration.
                    mqttclientconnections = {
          
          // Section list. This key is optional and has no default value.
                        sections = [
          
          // A configuration for an EventFlow adapter named section.
                                {
          
          // Section name. The value does not have to be unique; that is, you can have multiple 
          // sections with the same name in the same array of sections. This key is required.
                                name = "mqttclientconnection"
          
          // Section property bag. All values must be strings. 
          // This key is optional and has no default value.
                                  settings = {
                                      ClientId = ""
                                      Password = ""
                                      PersistencePath = "/"
                                      PersistenceType = "memory"
                                      URIs = "tcp://iot.eclipse.org:1883"
                                      Username = ""
                                      automaticReconnect = "false"
                                      connectOnStartup = "true"
                                      connectionTimeout = "30"
                                      id = "iot.eclipse.org"
                                      isCleanSession = "true"
                                      keepAliveInterval = "60"
                                      manualAcks = "false"
                                      maxInFlight = "10"
                                      enableBuffer = "true"
                                      bufferSize = "5000"
                                      persistBuffer = "false"
                                      deleteOldestMessages = "true"
                                      sslContextProvider = ""
                                      sslEnabledCipherSuites = ""
                                      sslKeyManager = ""
                                      sslKeyStore = ""
                                      sslKeyStorePassword = ""
                                      sslKeyStoreProvider = ""
                                      sslKeyStoreType = ""
                                      sslProtocol = ""
                                      sslTrustManager = ""
                                      sslTrustStore = ""
                                      sslTrustStorePassword = ""
                                      sslTrustStoreProvider = ""
                                      sslTrustStoreType = ""
                                      willPayload = ""
                                      willQos = ""
                                      willRetained = ""
                                      willTopic = ""
                                    }
                                }
                            ]
                        }
                    }
                }
          }
      

Shared Adapter Configuration Options

This section describes the shared adapter configuration block found in the HOCON file.

If a value is not present, the default is used. Those values listed without a default are required.

Property Type Default Description
id string   This is the name that will link the adapters together and is displayed in the drop-down list on each adapters property configuration.
ClientId string   The client identifier to use when creating a connection to the broker. If this value is blank an identifier will be generated.
connectOnStartup boolean true Determines if this connection is created on startup or during the first usage.
manualAcks boolean false If true then the user must explicitly ACK each message using the MQTTClientAck adapter. If false an ACK will automatically be sent went the message is received off the wire.
URIs string   Comma separated list of URIs, If the port is not specified, it will default to 1883 for tcp://" URIs, and 8883 for ssl:// URIs
isCleanSession boolean true • If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained: ◦ Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted. ◦ The server will treat a subscription as durable. • If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means: ◦ Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted ◦ The server will treat a subscription as non-durable.
maxInFlight int 10 The max inflight limits to how many messages we can send without receiving acknowledgments. If this value is missing the default is used
automaticReconnect boolean false Returns whether the client will automatically attempt to reconnect to the server if the connection is lost, on reconnect all subscriptions will be restored.
PersistenceType string memory The persistence class to use to store in-flight message. The values are 'memory' or 'file'.
PersistencePath string / The full path to use for file persistence. If this option is blank or missing the system will use System.getProperty("user.dir")as the location to store a temporary file.
Username string   Sets the user name to use for the connection.
Password string   Sets the password to use for the connection.
connectionTimeout int 30 Sets the connection timeout value. This value, measured in seconds, defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.
keepAliveInterval int 60 Sets the "keep alive" interval. This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect whether the server is no longer available, without having to wait for the TCP/IP timeout. The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, the client sends a very small ping message, which the server will acknowledge. A value of 0 disables keepalive processing in the client. The default value is 60 seconds.
willTopic string   The topic to publish the will to.
willPayload string   The payload for the message to send to the will topic to.
willQos int   The quality of service to which the message publish is to be published (0, 1 or 2).
willRetained boolean   Whether or not the will message should be retained.
enableBuffer boolean false If true, the client will store messages whilst disconnected.
bufferSize int 5000 The maximum number of messages that will be stored in memory while the client is disconnected.
persistBuffer boolean false If true, the client will persist the messages to disk, if false or not present, the messages will only be saved in memory.
deleteOldestMessages boolean false If true, the client will delete the 0th message in the buffer once it is full and a new message is published.
sslProtocol string   Sets the SSL properties for the connection. Note that these properties are only valid if an implementation of the Java Secure Socket Extensions (JSSE) is available. One of: SSL, SSLv3, TLS, TLSv1, SSL_TLS.
sslContextProvider string   Underlying JSSE provider. For example "IBMJSSE2" or "SunJSSE".
sslKeyStore string   The name of the file that contains the KeyStore object that you want the KeyManager to use.
sslKeyStorePassword string   The password for the KeyStore object that you want the KeyManager to use.
sslKeyStoreType string   for example "PKCS12", "JKS", or "JCEKS".
sslKeyStoreProvider string   for example "IBMJCE" or "IBMJCEFIPS".
sslTrustStore string   The name of the file that contains the KeyStore object that you want the TrustManager to use.
sslTrustStorePassword string   The password for the TrustStore object that you want the TrustManager to use.
sslTrustStoreType string   for example "PKCS12", "JKS", or "JCEKS".
sslTrustStoreProvider string   for example "IBMJCE" or "IBMJCEFIPS".
sslEnabledCipherSuites list(string)   A list of which ciphers are enabled. Values are dependent on the provider, Examples are: SSL_RSA_WITH_AES_128_CBC_SHA SSL_RSA_WITH_3DES_EDE_CBC_SHA.
sslKeyManager string   Sets the algorithm that will be used to instantiate a KeyManagerFactory object instead of using the default algorithm available in the platform. Example values: "IbmX509" or "IBMJ9X509".
sslTrustManager string   Sets the algorithm that will be used to instantiate a TrustManagerFactory object instead of using the default algorithm available in the platform. Example values: "PKIX" or "IBMJ9X509".

Cluster Aware Tab

Use the settings in this tab to allow this operator or adapter to start and stop based on conditions that occur at runtime in a cluster with more than one node. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with TIBCO Streaming releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.