Kafka ReceiveMessage Activity

Kafka ReceiveMessage activity is an event source activity which can be configured as a process starter in any TIBCO BusinessWorks process. It starts the process execution on receiving a Kafka message event.

General

On the General tab, specify the required parameters before using this activity. The General tab has the following fields:

Field Literal Value/ Process Property/ Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource for communicating with a Kafka server instance.
Group Id Yes The group ID for the consumer group.
Topic Names Yes The topic name where Kafka cluster stores streams of record.
Assign Custom Partition None This is a check box to select if Partition ID needs to be entered. You can select the check box to override Kafka's default partition assignment behavior.
Partition ID Yes Sequence ID or range of the partition to which the message is sent. Default is 0.
Note:
  • This field is enabled only if Assign Custom Partition field is checked.
  • For Projects created on Kafka Plug-in version 6.1.0 or earlier, if the PartitionID field in Kafka ReceiveMessage Activity is configured by using module or process property, the data type of the property must be changed from Integer to String.
Consumer Count Yes The Consumer Count specifies the number of KafkaConsumer instances started by the activity. The default value is 0 which will create and start KafkaConsumer instances equal to the number of partitions in the topic. The maximum value allowed for Consumer Count is equal to the number of partitions in the topic.
Key Deserializer Yes Class for the key that implements the serializer interface.
Value Deserializer Yes Value for the serializer interface.
Fetch Timeout Yes Specifies the maximum time in milliseconds to get the metadata about the topic before a timeout occurs.

The default value is 1000.

Fetch Min Bytes Yes The minimum amount of data that the server would send on receiving a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.
Fetch Max Wait Yes The maximum amount of time that the server would block before answering a fetch request if there is not sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

The default value is 500.

Heartbeat Interval Yes Time in milliseconds between heartbeats to the consumer. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing and information when consumers join or leave a group.

The default value is 3000.

Session Timeout Yes The consumer sends periodic heartbeats to server indicating about its liveness to the broker. If no heartbeats are received by a broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance.

The default value is 30000.

Description

On the Description tab, you can enter a short description for the activity.

Field Literal Value/ Process Property/ Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the Advanced tab of the ReceiveMessage activity.

Field Literal Value/ Process Property/ Module Property? Description
Sequence Key None XPath expression that specifies which processes must run in sequence. Process instances with sequencing keys that evaluate to the same value are executed sequentially in the sequence the process instance was created.
Custom Job Id None This field can contain an XPath expression that specifies a custom ID for the process instance.
Enable Auto Commit None
  • Select this check box to auto commit the message record.
  • Deselect the check box to enable Manual Commit for the message record.
Note: Use the Confirm activity from the General Activities palette for manual commit.
AutoCommit Interval Yes Interval in milliseconds at which the consumer offsets are auto-committed to Kafka when the auto-commit mode is enabled.

The default value is 5000.

AutoOffset Reset Yes This is required and selected when there is no initial offset or the offset is out of range.
  • Earliest: resets the offset to the earliest offset.
  • Latest: resets the offset to the latest offset.
  • None: throws exception to the consumer.
Properties None The properties name and value.
Note: All additional consumer properties can be configured here. For example, to set max poll records value, set property name as max.poll.records and its value.
Interceptors None

The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are executed in the order in which you specify them. The order of execution can be managed using and icons.

Note: To add user-defined properties to the interceptor class, you must specify those property in the Properties field.
Note: Kafka plug-in supports Fault Tolerance. In case of any errors, you can use Properties to add the correct name and value.

Conversation

You can initiate the conversation here. Click the Add New Conversation button to initiate multiple conversations.

Output

The following table describes the fields on the Output tab.

Field Type Description
KafkaReceiverMessageOutput complex The complete output for the ReceiveMessage activity.
topic string The topic name.
Partition number The sequence ID of the partition.
offset number The sequence ID assigned to each record within the partition.
Key String Specified key of the incoming record.
Message String Message received through Kafka.