MQTT Client Publish Adapter

Introduction

The TIBCO StreamBase® Output Adapter for MQTT Client Publish allows the system to publish data to an MQTT broker. Each message must contain the topic that the message will be sent to and the payload in binary or string form. The connection settings for this adapter are stored in the sbconf file.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Type Description
MQTT Connection drop-down list This required field will select a value from the sbconf file that contains the connection configuration for the MQTT client. This value is selected from the adapter-configuration section with name equal to mqttclientconnections, section name equal to mqttclientconnection, the value is pulled from the id parameter.
Enable Control Port check box When enabled the adapter allows incoming tuples to control the actions of this adapter.
Command Field Name string The name of the field in the incoming control tuple that contains the command.
Enable Status Port check box When enabled, the adapter sends out informational data on the status port about various states of the adapter.
Enable Published Status Messages check box When enabled, the adapter sends out a status message when the message is published to the message queue.
Enable Delivery Complete Status Messages check box When enabled, the adapter sends out a status message when the message delivery is complete.
Topic Field Name string The name of the field in the incoming tuple that contains the topic.
Payload Field Name string or blob The name of the field in the incoming tuple that contains the payload.
Qos Field Name string Optional: The name of the field in the incoming tuple that contains the QoS (quality of service) setting.
Retailed Field Name string Optional: The name of the field in the incoming tuple that contains the retained flag.
Enable Pending Delivery Tokens Port check box When enabled, the adapter sends out pending delivery token informational data on the pending delivery port when requested.
Log Level Drop-down list Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Data Port

Description

Use the data port to send publish data to the adapter.

Data Port Schema

  • Topic, string, The topic to publish to.

  • Payload, string/blob, The data to publish.

  • Qos, int, Optional: The QoS level (0, 1, or 2) for this message.

  • Retained, boolean, Optional: The retained flag when publishing this message.

Control Port

Description

Use the control port to send action commands to the adapter.

Control Port Schema

  • Command, string, the command to send to the adapter.

    The values are:

    • Connect — Connect to the MQTT broker.

    • Disconnect — Disconnect from the MQTT broker.

    • PendingDeliveryTokens — Fetch all pending delivery tokens and output them on the pending delivery port (port must be enabled).

Connection Configuration

Connection Configuration

All settings are in the format <setting name="" val=""/>

The connection configuration is stored in the project's sbconf file. The following is an example of a complete configuration.

<?xml version="1.0" encoding="UTF-8"?>
<streambase-configuration xmlns:xi="http://www.w3.org/2001/XInclude"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="http://www.streambase.com/schemas/sbconf/">

  <adapter-configurations>
    <adapter-configuration name="mqttclientconnections">
      <section name="mqttclientconnection">
        <setting name="id" val="iot.eclipse.org"/>
                
        <!-- Determines if this connection is created on startup or during the 
          first usage. -->
        <setting name="connectOnStartup" val="true" />

        <!-- If true then the user must explicitly ACK each message using the 
                MQTTClientAck adapter.  If false an ACK will automatically be 
                sent went the message is received off the wire. -->
        <setting name="manualAcks" val="false" />
        
        <!-- Comma separated list of URIs, If the port is not specified, it 
          will default to 1883 for tcp://" URIs, and 8883 for ssl:// URIs -->
        <setting name="URIs" val="tcp://iot.eclipse.org:1883" />
                                
        <!-- isCleanSession:
         • If set to false both the client and server will maintain 
           state across restarts of the client, the server and the 
           connection. As state is maintained: 
           ◦ Message delivery will be reliable meeting the specified QOS even
             if the client, server or connection are restarted. 
           ◦ The server will treat a subscription as durable. 
         • If set to true the client and server will not maintain state 
           across restarts of the client, the server or the connection. 
           This means: 
           ◦ Message delivery to the specified QOS cannot be maintained if 
             the client, server or connection are restarted 
           ◦ The server will treat a subscription as non-durable 
        -->
        <setting name="isCleanSession" val="true" />
                
        <!-- The client identifier to use when creating a connection to the 
          broker.  If this value is blank an identifier will be generated. -->
        <setting name="ClientId" val="" />
                
        <!-- The persistence class to use to store in-flight message. The 
          values are 'memory' or 'file'. -->
        <setting name="PersistenceType" val="memory" />
                
        <!-- The full path to use for file persistence. If this option is 
          blank or missing the system will use System.getProperty("user.dir")
          as the location to store a temporary file-->
        <setting name="PersistencePath" val="/" />
                
        <!-- Sets the user name to use for the connection. -->
        <setting name="Username" val="" />
                
        <!-- Sets the password to use for the connection. -->
        <setting name="Password" val="" />
                
        <!-- Sets the connection timeout value. This value, measured in 
          seconds, defines the maximum time interval the client will 
          wait for the network connection to the MQTT server to be 
          established. The default timeout is 30 seconds. A value of 0 
          disables timeout processing meaning the client will wait until
          the network connection is made successfully or fails. -->
        <setting name="connectionTimeout" val="30" />
                
        <!-- Sets the "keep alive" interval. This value, measured in 
          seconds, defines the maximum time interval between messages
          sent or received. 
          It enables the client to detect whether the server is no 
          longer available, without having to wait for the TCP/IP 
          timeout. 
          The client will ensure that at least one message travels 
          across the network within each keep alive period. 
             In the absence of a data-related message during the time 
               period, the client sends a very small ping message, 
               which the server will acknowledge. 
             A value of 0 disables keepalive processing in the client. 
               The default value is 60 seconds. -->
        <setting name="keepAliveInterval" val="60" />
                
        <!-- Sets the "Last Will and Testament" (LWT) for the 
          connection. In the event that this client unexpectedly 
          loses its connection to the server, the server will publish 
          a message to itself using the supplied details. -->
                
        <!--  The topic to publish the will to -->
        <setting name="willTopic" val="" />
                
        <!--  The payload for the message to send to the will topic to -->
        <setting name="willPayload" val="" />
                
        <!--  The quality of service to which the message publish is to 
          be published (0, 1 or 2). -->
        <setting name="willQos" val="" />
                
        <!--  Whether or not the will message should be retained. -->
        <setting name="willRetained" val="" />        
                
        <!-- Sets the SSL properties for the connection. Note that 
          these properties are only valid if an implementation of the 
          Java Secure Socket Extensions (JSSE) is available. -->                       
        <!-- One of: SSL, SSLv3, TLS, TLSv1, SSL_TLS -->
        <setting name="sslProtocol" val="" />
                
        <!-- Underlying JSSE provider. For example "IBMJSSE2" or 
          "SunJSSE" -->
        <setting name="sslContextProvider" val="" />
                
        <!-- The name of the file that contains the KeyStore object 
          that you want the KeyManager to use. --> 
        <setting name="sslKeyStore" val="" /> 
                
        <!-- The password for the KeyStore object that you want 
          the KeyManager to use -->
        <setting name="sslKeyStorePassword" val="" />
                
        <!-- for example "PKCS12", "JKS", or "JCEKS" --> 
        <setting name="sslKeyStoreType" val="" /> 
                
        <!-- for example "IBMJCE" or "IBMJCEFIPS" -->
        <setting name="sslKeyStoreProvider" val="" />
                
        <!-- The name of the file that contains the KeyStore object 
          that you want the TrustManager to use. --> 
        <setting name="sslTrustStore" val="" /> 
                
        <!-- The password for the TrustStore object that you want 
          the TrustManager to use. -->
        <setting name="sslTrustStorePassword" val="" />
                
        <!-- for example "PKCS12", "JKS", or "JCEKS" -->
        <setting name="sslTrustStoreType" val="" />
                
        <!-- for example "IBMJCE" or "IBMJCEFIPS" -->
        <setting name="sslTrustStoreProvider" val="" />
                
        <!-- A list of which ciphers are enabled. Values are dependent 
          on the provider, Examples are: 
          SSL_RSA_WITH_AES_128_CBC_SHA
          SSL_RSA_WITH_3DES_EDE_CBC_SHA -->
        <setting name="sslEnabledCipherSuites" val="" />
                
        <!-- Sets the algorithm that will be used to instantiate a 
          KeyManagerFactory object instead of using the default 
          algorithm available in the platform. 
          Example values: "IbmX509" or "IBMJ9X509" -->
        <setting name="sslKeyManager" val="" /> 
                
        <!-- Sets the algorithm that will be used to instantiate 
          a TrustManagerFactory object instead of using the default 
          algorithm available in the platform. 
          Example values: "PKIX" or "IBMJ9X509". -->
        <setting name="sslTrustManager" val="" />
      </section>            
    </adapter-configuration>
  </adapter-configurations>
</streambase-configuration>    

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.