IBM WebSphere MQ Input Adapter

Introduction

The TIBCO StreamBase® Input Adapter for IBM MQ Series allows a StreamBase application to retrieve messages from an IBM WebSphere MQ server. Each message read from the MQ server results in the emission of a tuple by the adapter. The adapter is embedded in the StreamBase application.

At startup, the adapter connects to the MQ server, continually reads messages from the configured server queue or subscribed topic(s), converts the messages to tuples, and emits the tuples on its primary output port.

The adapter can be configured to parse incoming messages as XML, populating each tuple field with the value from the corresponding leaf element in the XML message. List and nested tuple fields are used to represent the hierarchy of the XML message. XML elements with no corresponding tuple field are discarded. Tuple fields with no corresponding XML element are set to null.

The adapter provides a status output port to convey status and error information to the StreamBase application. For example, when the state of the connection to the MQ server changes, a connection status tuple is emitted from the status output port.

The adapter is configured through several properties set in the adapter's Properties view in StreamBase Studio.

IBM MQ Middleware Dependencies

You must obtain the required IBM MQ Java libraries directly from IBM. The easiest way to make the IBM MQ API accessible to StreamBase is install them as Maven dependencies into your Maven repository.

The IBM MQ sample comes with Studio launch files which perform a Maven install to the local Maven repository to aid in running the sample; simply copy the JAR files into the root of the sample project and run each launch config to install. It is however recommended you install to your main Maven repository to make sure they are available to all machines you may distribute your code to.

Obtain the following required IBM MQ JAR files from an IBM MQ Series 7.0 installation, or from IBM:

com.ibm.mq.commonservices.jar
com.ibm.mq.headers.jar
com.ibm.mq.jar
com.ibm.mq.jmqi.jar
com.ibm.mq.pcf.jar
connector.jar

Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Description
Enable Control Port If enabled a control port is available to control various features of the adapter at runtime.
Connect At Startup If enabled the adapter will connect to the MQ server on startup. This option is only available when the control port is enabled. If the control port is disabled the adapter will always connect on startup.
Bindings Mode If enabled, the adapter connects directly to a local queue manager rather than communicating through a network. When enabled, the Host Name, Port Number, and Channel Name controls are disabled.
Host Name The hostname or IP address of the IBM WebSphere MQ server. This list may be a comma separated list in which case the adapter will cycle through the list of hosts upon a disconnect.
Port Number The TCP port number of the MQ server.
User ID The IBM WebSphere MQ user name, if your site requires authentication when connecting to the MQ server.
Password Password for User ID.
Queue Manager The MQ queue manager name.
Use Topics If enabled, the adapter reads messages from one or more subscribed topics. Otherwise, that adapter reads messages from the configured MQ queue.
Default Topic If specified, the default topic to read messages from, requires no dynamic subscription.
Allow Dynamic Topics If enabled, an additional adapter input port can be used to subscribe or unsubscribe to "dynamic topics" be enqueuing tuples to the port.
Max Topic Count The maximum allowed number of concurrent dynamic topics. Since each subscribed topic consumes resources, namely an operating system thread, this property cannot be configured above 100.
Queue Name The MQ queue name. Ignored when the adapter is configured to use topics.
Exclusive Queue Access If enabled, the adapter connects to the queue for exclusive access, preventing other simultaneous readers. If another reader, exclusive or shared, has already connected to the queue, the adapter fails to connect, instead showing object-in-use status. When accessing an alias queue exclusively, the __Depth field of the ReceivedMessage stream is populated with null. Ignored when the adapter is configured to use topics.
Enable Read Ahead If enabled the MQ Queue Manager will request the topics and queues enable the read ahead feature. Please see IBM WebSphere MQ documentation for more information.
Channel Name The MQ channel name.
Unicode Encoding Specifies which of three encoding formats are used to decode Unicode payloads: UTF-16, UTF-16 Big Endian, or UTF-16 Little Endian.
Include Metadata Field If enabled, the adapter includes a nested tuple field in its output schema that holds the metadata received with the message.
Include JMS Correlation ID Field If enabled, the adapter includes a JMSCorrelationID subfield in the schema metadata field that holds the JMS correlation ID received with the message. This field is only activated when the Include Metadata Field is selected.
Include Depth Field If enabled, the adapter includes a field containing the current queue depth in its output schema. Note: enabling this property negatively impacts performance.
Acknowledge Messages If enabled, the adapter waits to remove incoming messages from the MQ queue until they are acknowledged by the StreamBase application. Enabling this option adds an input port to the input adapter with AcknowledgeMessage schema. Messages are acknowledged by enqueuing tuples to the adapter's Acknowledge Message input port. The Include Metadata Field property must be enabled to enable Acknowledge Messages.
Syncpoint On Get If enabled, the adapter gets each message with syncpoint control. Note: enabling this property negatively impacts performance.
Include Raw Payload Field If enabled and Parse Payload as XML is enabled, the adapter includes a __Payload string field in its output port to convey the raw XML message read from the MQ server. The __Payload field is always present when not parsing XML.
Date/Time Format Used when parsing XML to convert date and time XML elements to timestamps. The format of the format string is described in the java.text.SimpleDateFormat class described in the Oracle Java Platform SE reference documentation. Typical format string values include yyyyMMdd and yyyyMMdd HH:mm:ss.
Reconnect Interval The time, in seconds, to wait between attempts to connect to the MQ server. The valid range is 1 to 60, inclusive.
SSL Cipher Suite The cipher suite to use when connecting to an MQ sever using SSL, or blank for non-SSL connections. When using SSL connections, a truststore file is specified through JVM parameters in the server configuration file:

<param name="jvm-args" value="-Djavax.net.ssl.trustStore=/etc/truststore.jks -Djavax.net.ssl.trustStorePasswords=truststorepass -Djavax.net.ssl.keyStoreType=JKS"/>

Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

XML Parsing Tab

Property Description
Parse Payload as XML

If enabled, the adapter attempts to parse incoming messages as XML, populating tuple fields with the corresponding XML leaf element values. The output schema must have a top-level tuple field of type tuple whose name matches the name of the top-level XML element. A single adapter instance can parse multiple XML message types by including multiple top-level tuple fields in the output schema. For example, suppose you want to parse the following two XML messages:

<xml1><s>This is a string</s></xml1>
<xml2><i>123</i></xml2>

You would define two top-level tuple fields in the MQ input adapter's Edit Schema tab: xml1 and xml2. The schema of xml1 would have one subfield named s of type string. The schema of xml2 would have one subfield named i of type int. When the first message is received, field xml1 in the emitted tuple would be non-null, while field2 xml2 would be null, and vice-versa for the second message.

XML Value Field Name The name of the tuple subfield that receives an XML element's value; the default is _VALUE. This field must be used when attributes are being retrieved from an XML element. When attributes are disabled, or no attributes are being retrieved from a particular XML element, a tuple field with a name matching the XML element's tag can be used.
Attribute Values Supported If enabled (the default), attributes can be retrieved from XML elements, either through a field specified by the Attribute Values Field Name property or through a field with the same name as the XML attribute.
Attribute Values Field Name

The name of the tuple subfield that receives an XML element's attributes; the default is _ATTRIBUTES. The schema of attribute value fields must be list<tuple<string Name, string Value>>.

An alternate mechanism is available for retrieving attribute values. Rather than using an _ATTIBUTES subfield, a subfield with a name matching the attribute name and type compatible with the attribute value can be used.

Date/Time Format The format to use in converting StreamBase date-time strings to timestamps in parsing XML messages. The format of the format string is described in the java.text.SimpleDateFormat class described in the Oracle Java Platform SE reference documentation. Typical format string values include yyyyMMdd and yyyyMMdd HH:mm:ss.
Assume Local Time Zone If enabled, date-time strings containing no timezone specifier are assumed to represent local time. If disabled (the default), date-time strings are assumed to represent GMT.
Include Null List Values If enabled (the default), Include list values containing nulls in the generated tuple.
Null List Value Representation Representation of null list values in XML. The default is null.

Get Message Options Tab

Property Description
All Available Messages

If enabled, the adapter includes the All Available Messages option when reading messages from the queue. This can result in errors when working with non-indexed queues.

Complete Message

If enabled, the adapter includes the Complete Message option when reading messages from the queue. This can result in errors when working with non-indexed queues.

Message Read Timeout (MS)

The number of milliseconds to allow the reader to block waiting for a message. Caution: setting this value high may cause shutdown delays.

Edit Schema Tab

Use the Edit Schema tab to specify the schema of the output tuple for this adapter. For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

Cluster Aware Tab

Use the settings in this tab to allow this operator or adapter to start and stop based on conditions that occur at runtime in a cluster with more than one node. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with TIBCO Streaming releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Using the Adapter in a StreamBase Module

This section discusses how to use the IBM WebSphere MQ input adapter in a StreamBase application. As shown in the diagram below depicting the adapter's sample application, the input adapter (MQInput) has no input ports and two output ports to communicate with the surrounding application.

The IBM WebSphere MQ Input Adapter's ports are used as follows:

  • AcknowledgeMessage (input): This input port is present when the Acknowledge Messages property is enabled. When operating in acknowledge-message mode, the adapter waits to remove messages from the MQ queue until they are acknowledged by the StreamBase application by enqueuing tuples to this port. An acknowledge-message tuple must include or more metadata values associated with the message being acknowledged. When more than one field in an acknowledge-message tuple is non-null, the first message on the MQ queue that matches any of the values is removed form the queue. The port has the following schema:

    • CorrelationId, blob: The message's correlation ID.

    • GroupId, blob: The message's group ID.

    • MessageId, blob: The message's message ID.

    • MessageSequenceNumber, int: The message's sequence number.

  • SubscribeToTopic (input): This input port is present when the Use Topics property is enabled. When configured for topics, the adapter request from one or more topics rather than an MQ queue. The port has the following schema:

    • Subscribe, bool: True to subscribe to a topic and False to unsubscribe.

    • Topic, string: The topic to subscribe to or unsubscribe from.

  • InputStatus (output): This output port emits status, information, and error tuples. This port has the following schema:

    • type, string: Contains one of the following values describing the type of event that occurred:

      • Connection - A connection to the MQ server has gone up or down.

      • Message - An incoming MQ message could not be processed.

      • Suspend/Resume - The adapter has been suspended or resumed.

    • object, string: The name of the object related to the event:

      • Connection - The configured IBM WebSphere MQ server, including the host name, port number, the queue manager name, queue name, and channel name.

      • Message - The payload of the incoming message.

      • Suspend/Resume - The adapter's name on the canvas.

    • action, string: The action associated with the event.

      • Connection - Up or Down

      • Message - XML Parse Error

      • Suspend/Resume - Suspending or Resumed

    • message, string: A human-readable description of the event.

  • ReceivedMessage (output): The adapter emits a tuple on this port for each message read from the MQ server. This port derives its schema from the adapter's Edit Schemas tab. Several additional fields appear on this port:

    • __Metadata, tuple: Present when the Include Metadata Field is enabled, to convey the contents of the metadata received with the message. This field has the following sub-fields:

      • AccountingToken, blob: Part of the identity of the message, allowing work done as a result of the message to be appropriately charged.

      • ApplicationOriginData, string: Data about the originating application, which can be used by the application to provide additional information about the origin of the message.

      • ApplicationIdData, string: Part of the identity context of the message - information defined by the application suite; it can be used to provide additional information about the message or its originator.

      • BackoutCount, int: The number of times the message has been backed out.

      • CharacterSet, string: The coded character set identifier of character data in the application message data. Valid values include:

        • ASCII

        • ISO ASCII

        • EBCDIC

        • Unicode

        • UTF-8

      • CorrelationId, blob: The correlation identifier of the message.

      • Encoding, tuple: Specifies the representation used for binary, packed decimal and floating point values in the application message.

        • Value, int: The bitmap specifying the encoding values.

        • Binary, tuple: The binary encoding value.

          • Normal, bool: True if normal (big endian) is used for binary values.

          • Reversed, bool: True if reversed (little endian) is used for binary values.

        • PackedDecimal, tuple: The packed decimal encoding value.

          • Normal, bool: True if normal (big endian) is used for packed decimal values.

          • Reversed, bool: True if reversed (little endian) is used for packed decimal values.

        • FloatingPoint, tuple: The floating point encoding value.

          • Normal, bool: True if normal (big endian) is used for floating point values.

          • Reversed, bool: True if reversed (little endian) is used for floating point values.

          • S390, bool: True if S390 (zSeries) is used for floating point values.

      • Expiry, int: The expiry time (in tenths of a second) of the message.

      • Feedback, tuple: The nature of the feedback report, used with an MQ message of type REPORT to indicate the nature of the report.

        • Value, int: Numeric feedback value. One of the following boolean fields is set for system-defined feedback values.

        • Activity

        • ApplCannotBeStarted

        • ApplTypeError

        • ApplTypeError

        • BufferOverflow

        • ChannelCompleted

        • ChannelFail

        • ChannelFailRetry

        • COA

        • COD

        • DataLengthNegative

        • DataLengthTooBig

        • DataLengthZero

        • Expiration

        • IihError

        • LengthOffByOne

        • MaxActivities

        • Nan

        • None

        • NotDelivered

        • NotForwarded

        • Pan

        • Quit

        • StoppedByMsgExit

        • TmError

        • UnsupportedDelivery

        • UnsupportedForwarding

        • XmitQMsgError

      • Format, string: A name that indicates the nature of the data in the message.

      • GroupId, blob: The identifier of the message group to which the message belongs.

      • JMSCorrelationID, string: The JMS correlation ID from the message. This field is present only if the Include JMS Correlation ID Field property is enabled.

      • MessageFlags, tuple: Flags controlling the segmentation and status of the message.

        • Value, int: The message flags bitmap.

        • SegmentationInhibited

        • SegmentationAllowed

        • Segment

        • LastSegment

        • MsgInGroup

        • LastMsgInGroup

      • MessageId, blob: The message's identifier.

      • MessageSequenceNumber, int: The sequence number of the logical message within the group.

      • MessageType, string: The type of the message:

        • DATAGRAM

        • REQUEST

        • REPLY

        • REPORT

      • Offset, int: The offset of data in the physical message from the start of the logical message.

      • OriginalLength, int: The original length of a segmented message.

      • Persistence, tuple: The message persistence:

        • Value, int: The persistence bitmap.

        • Persistent

        • NotPersistent

        • PersistenceAsQDef

      • Priority, int: The message priority

      • PutApplicationName, string: The name of the application that put the message.

      • PutApplicationType, tuple: The system- or user-defined value of the type of application that put the message.

        • Value, int: The put application type value.

        • Name, string: The corresponding name, if known, or UNKNOWN.

      • PutDateTime, timestamp: The time and date when the message was put.

      • ReplyToQueueManagerName, string: The MQ queue manager name to which responses should be sent. Overrides the corresponding adapter property.

      • ReplyToQueueName, string: The MQ queue name to which responses should be sent. Overrides the corresponding adapter property.

      • Report, tuple: Specifies which report messages are required, whether the application message data is to be included in them, and also how the message and correlation ID in the report or reply are to be set.

        • Value, int: The bitmap used to specify all report values.

        • Exception, tuple: Specifies whether application data is included in exception reports.

          • Basic, bool: True if basic information is included in exception reports.

          • Data, bool: True if partial application data is included in exception reports.

          • FullData, bool: True if partial application data is included in exception reports.

        • Exception, tuple: Specifies whether application data is included in exception reports.

          • Basic, bool: True if basic information is included in exception reports.

          • Data, bool: True if partial application data is included in exception reports.

          • FullData, bool: True if partial application data is included in exception reports.

        • Expiration, tuple: Specifies whether application data is included in expiration reports.

          • Basic, bool: True if basic information is included in expiration reports.

          • Data, bool: True if partial application data is included in expiration reports.

          • FullData, bool: True if partial application data is included in expiration reports.

        • COA, tuple: Specifies whether application data is included in confirmation on arrival reports.

          • Basic, bool: True if basic information is included in COA reports.

          • Data, bool: True if partial application data is included in COA reports.

          • FullData, bool: True if partial application data is included in COA reports.

        • COD, tuple: Specifies whether application data is included in confirmation on delivery reports.

          • Basic, bool: True if basic information is included in COD reports.

          • Data, bool: True if partial application data is included in COD reports.

          • FullData, bool: True if partial application data is included in COD reports.

        • NewMsgId, bool: True to use a new message ID in the report or reply.

        • PassMsgId, bool: True to use the ID from original message in the report or reply.

        • CopyMsgIdToCorrelId, bool: True to use the ID of the original message ID for the correlation ID in the report or reply.

        • PassCorrelId, bool: True to use specified correlation in the report or reply.

        • DealLetterQ, bool: Dispose undelivered messages in the dead letter queue.

        • DiscardMsg, bool: True to discard undelivered messages.

        • PassDiscardAndExpiry, bool: True to pass on discarded and expired messages.

      • UserId, string: The user ID, which is part of the identity of the message and identifies which user originated it.

    • __Payload, string: Present when not parsing XML or when the Include Raw Payload Field is enabled, to convey the contents of the message read from the MQ server.

    • __ReplyToQueueManager, string: Contains the name of the queue manager to which a response to this message should be sent.

    • __ReplyToQueue, string: Contains the name of the queue to which a response to this message should be sent. Present only when the adapter is configured to not use topics.

    • __Depth, int: Contains the number of message in the message queue, not including the message just read. When accessing an alias queue exclusively, this field is populated with null. Present only when the adapter is configured to not use topics.

    • __Topic, string: Contains the name of the topic from which the message was received. Present only when the adapter is configured to use topics.

Add an instance of the adapter to a new StreamBase application as follows:

  1. In StreamBase Studio, create a project an EventFlow diagram to host the adapter.

  2. Drag an instance of the IBM WebSphere MQ input adapter from the Operators and Adapter drawer in the Palette view to the canvas.

  3. Select or double-click the adapter icon. In the Properties view, select the Adapter Properties tab and fill in values for the IBM WebSphere MQ server host, port, queue manager name, queue name, and channel name.

  4. If parsing XML, click the Edit Schemas tab and add one or more tuple fields whose names match the top-level XML elements of the expected XML messages. For each tuple field, add sub-fields whose names a types match those in the corresponding XML message. Use list and additional nested tuple fields to represent the hierarchy in the XML message. Note that list of lists are not supported. Instead, use a list of nested tuples, in which the nested tuple schema contains a single list field.

  5. Connect Output streams to the adapter output ports.

Control Port

The IBM WebSphere MQ Input adapter can enable a control port to send commands to the adapter during runtime to control various operations. The currently supported operations are:

  • Connect - Tells the adapter to connect to the IBM MQ server.

  • Disconnect - Tells the adapter to disconnect from the IBM MQ server.

  • NextHost - Tells the adapter to disconnect from the IBM MQ server and switch to the next host in the list and then connect.

Typechecking and Error Handling

The IBM WebSphere MQ Input adapter generates the following typecheck messages to help you configure the adapter in your StreamBase application:

  • An invalid port number is provided.

  • The queue manager name is missing.

  • The queue name is missing.

  • The channel name is missing.

  • The adapter is configured to parse XML, one or more timestamp fields is present in its ReceivedMessage output schema, and the date and time format string is missing or invalid.

  • The reconnection interval is invalid (not between 1 and 60 seconds, inclusive).

  • One or more top-level, non-tuple fields are present in the adapter's ReceivedMessage output schema.

  • One or more list-of-list fields are present in the adapter's ReceivedMessage output schema.

The adapter emits tuples on the status port during runtime under various conditions:

  • A connection to the IBM WebSphere MQ server goes up or down.

  • The adapter is suspended or resumed.

Suspend and Resume Behavior

When suspended, the IBM WebSphere input adapter stops emitting tuples. The message thread continues to run and receive messages but those messages will be discarded.

When resumed, the adapter begins emitting tuples again.

Related Topics