Message consumers are clients that receive messages published to a topic or sent to a queue. When working with topics, a Message Consumer is commonly referred to as a
Subscriber.
A Message Consumer can be created with a "message selector" that restricts the consumption of message to those with specific properties. When creating a Message Consumer for topics, you can set a
noLocal attribute that prohibits the consumption of messages that are published over the same connection from which they are consumed.
Carefully consider the message selectors that are used with queue consumers. Because messages that do not match a queue consumer’s message selectors remains in the queue until it is retrieved by another consumer, a non-matching message can experience many failed selectors. This is especially so when queue consumers connect, consume a message, and immediately disconnect.
As described in Durable Subscribers for Topics, messages published to topics are only consumed by active subscribers to the topic; otherwise the messages are not consumed and cannot be retrieved later. You can create a durable subscriber that ensures messages published to a topic are received by the subscriber, even if it is not currently running. For queues, messages remain on the queue until they are either consumed by a Message Consumer, the message expiration time has been reached, or the maximum size of the queue is reached.
The following examples create a Message Consumer that consumes messages from the queue and a durable subscriber that consumes messages from a topic. The queue and topic are those that were dynamically created in
Dynamically Creating Topics and Queues.
|
The createDurableSubscriber method either creates a new durable subscriber for a topic or attaches the client to a previously created durable subscriber. A user must have durable permission on the topic to create a new durable subscriber for that topic. A user must have at least use_durable permission on the topic to attach to an existing durable subscriber for the topic. See User Permissions for details.
|
MessageConsumer QueueReceiver = session.createConsumer(queue);
See the tibjmsMsgConsumer.java sample client for a working example.
The following Session.createDurableSubscriber() method creates a durable subscriber, named
"MyDurable":
TopicSubscriber subscriber = session.createDurableSubscriber(topic,"myDurable");
See the tibjmsDurable.java sample client for a working example.
tibemsMsgConsumer QueueReceiver = NULL;
status = tibemsSession_CreateConsumer(session,
&QueueReceiver, queue, NULL, TIBEMS_FALSE);
See the tibemsMsgConsumer.c sample client for a working example.
The following tibemsSession_CreateDurableSubscriber function creates a durable subscriber, named
"myDurable," of type
tibemsMsgConsumer:
tibemsMsgConsumer msgConsumer = NULL;
status = tibemsSession_CreateDurableSubscriber(session, &msgConsumer, topic, "myDurable",
See the tibemsDurable.c sample client for a working example.
MessageConsumer QueueReceiver = session.createConsumer(queue);
See the csMsgConsumer.cs sample client for a working example.
The following Session.CreateDurableSubscriber method creates a durable subscriber, named
"MyDurable":
TopicSubscriber subscriber =
session.CreateDurableSubscriber(topic, "myDurable");
See the csDurable.cs sample client for a working example.
EMS allows a Message Consumer to consume messages either synchronously or asynchronously. For synchronous consumption, the Message Consumer explicitly calls a receive method on the topic or queue. For asynchronous consumption, you can implement a
Message Listener that serves as an asynchronous event handler for messages.
A Message Listener implementation has one method, onMessage, that is called by the EMS server when a message arrives on a destination. You implement the
onMessage method to perform the desired actions when a message arrives. Your implementation should handle all exceptions, and it should not throw any exceptions.
Once you create a Message Listener, you must register it with a specific Message Consumer before calling the connection’s
start method to begin receiving messages.
A Message Listener is not specific to the type of the destination. The same listener can obtain messages from a queue or a topic, depending upon the destination set for the Message Consumer with which the listener is registered.
Create an implementation of the MessageListener interface, create a
MessageConsumer, and use the
MessageConsumer object’s
setMessageListener() method to register the Message Listener with the Message Consumer:
public class tibjmsAsyncMsgConsumer implements MessageListener
/* Create a connection, session and consumer */
MessageConsumer QueueReceiver = session.createConsumer(queue);
QueueReceiver.setMessageListener(this);
Implement the onMessage() method to perform the desired actions when a message arrives:
public void onMessage(Message message)
/* Process message and handle exceptions */
See the tibjmsAsyncMsgConsumer.java sample client for a working example.
Implement an onMessage() function to perform the desired actions when a message arrives:
void onMessage(tibemsMsgConsumer QueueReceiver,
tibemsMsg message, void* closure)
/* Process message and handle exceptions */
In another function, that creates a tibemsMsgConsumer and uses the
tibemsMsgConsumer_SetMsgListener function to create a message listener for the Message Consumer, specifying onMessage() as the callback function:
tibemsMsgConsumer QueueReceiver = NULL;
/* Create a connection, session and consumer */
status = tibemsSession_CreateConsumer(session,
&QueueReceiver, queue, NULL, TIBEMS_FALSE);
status = tibemsMsgConsumer_SetMsgListener(QueueReceiver, onMessage, NULL);
status = tibemsConnection_Start(connection);
See the tibemsAsyncMsgConsumer.c sample client for a working example.
Create an implementation of the IMessageListener interface, use
Session.CreateConsumer to create a
MessageConsumer, and set the
MessageListener property on the
MessageConsumer object to register the Message Listener with the Message Consumer:
public class csAsyncMsgConsumer : IMessageListener
/* Create a connection, session and consumer */
MessageConsumer QueueReceiver = session.CreateConsumer(queue);
QueueReceiver.MessageListener = this;
Implement the IMessageListener.OnMessage method to perform the desired actions when a message arrives:
public void OnMessage(Message message) {
/* Process message and handle exceptions */
See the csAsyncMsgConsumer.cs and
csAsyncMsgConsumerUsingDelegate.cs sample clients for working examples.