Kafka Receive Message Activity

Kafka Receive Message activity is an event source activity which can be configured as a process starter in any TIBCO BusinessWorks process. It starts the process execution on receiving a Kafka message event.

General

On the General tab, specify the required parameters before using this activity. The General tab has the following fields:

Field Literal Value/ Process Property/ Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource for communicating with a Kafka server instance.
Group ID Yes The group ID for the consumer group.
Topic Names Yes The topic name where Kafka cluster stores streams of record. Multiple topic names are supported and these can be separated by a semi colon (;)
Assign Custom Partition None This is a check box to select if Partition ID needs to be entered. You can select the check box to override Kafka's default partition assignment behavior.
Partition IDs Yes Sequence ID or range of the partition to which the message is sent. Default is 0. Multiple partition ID's are supported and these can be separated using a comma (,)

For example,

Single Topic and Single Partition:

Partition IDs = 0

Single Topic and Multiple Partitions:

Partition IDs = 0,1,2 or for specifying range

Partition IDs = 0-2

Multiple Topics and Multiple Partitions:

For two topics -

Partition IDs = 0,1,2;0,4 or

Partition IDs = 0-2;0,1,2 or

Partition IDs = 1-2;0-3

Note:
  • This field is enabled only if Assign Custom Partition field is checked.
  • For Projects created on Kafka Plug-in version 6.1.0 or earlier, if the PartitionID field in Kafka Receive Message Activity is configured by using module or process property, the data type of the property must be changed from Integer to String.
Consumer Count Yes The Consumer Count specifies the number of KafkaConsumer instances started by the activity. The default value is 0 which will create and start KafkaConsumer instances equal to the number of partitions in the topic. The maximum value allowed for Consumer Count is equal to the number of partitions in the topic.
Use Registry No This check box is available when Avro Schema is selected in the Key Deserializer or Value Deserializer field.

This check box enables you to use the Avro Schema with the Schema Registry.

Avro Encoding Type Yes This field is available when the Avro Schema is selected in the Key Deserializer or Value Deserializer field and the Use Registry check box is not enabled.
Three types of encoding are available in the dropdown:
  • JSON Encoding
  • Binary Encoding
  • Single Object Encoding

For more information on the Avro Encoding types, please refer to Avro documentation.

Key Deserializer No Class for the key that implements the serializer interface.
Key Avro Schema File Yes This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker.

Value Deserializer No Value for the serializer interface.
Value Avro Schema File Yes This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker provided.

Fetch Timeout Yes Specifies the maximum time in milliseconds to get the metadata about the topic before a timeout occurs.

The default value is 1000.

Fetch Min Bytes Yes The minimum amount of data that the server would send on receiving a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.
Fetch Max Wait Yes The maximum amount of time that the server would block before answering a fetch request if there is not sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

The default value is 500.

Heartbeat Interval Yes Time in milliseconds between heartbeats to the consumer. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing and information when consumers join or leave a group.

The default value is 3000.

Session Timeout Yes The consumer sends periodic heartbeats to server indicating about its liveness to the broker. If no heartbeats are received by a broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance.

The default value is 30000.

Description

On the Description tab, you can enter a short description for the activity.

Field Literal Value/ Process Property/ Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the Advanced tab of the Receive Message activity.

Field Literal Value/ Process Property/ Module Property? Description
Sequence Key None XPath expression that specifies which processes must run in sequence. Process instances with sequencing keys that evaluate to the same value are executed sequentially in the sequence the process instance was created.
Custom Job Id None This field can contain an XPath expression that specifies a custom ID for the process instance.
Enable Auto Commit None
  • Select this check box to auto commit the message record.
  • Deselect the check box to enable Manual Commit for the message record.
Note: Use the Confirm activity from the General Activities palette for manual commit.
AutoCommit Interval Yes Interval in milliseconds at which the consumer offsets are auto-committed to Kafka when the auto-commit mode is enabled.

The default value is 5000.

AutoOffset Reset Yes This is required and selected when there is no initial offset or the offset is out of range.
  • Earliest: resets the offset to the earliest offset.
  • Latest: resets the offset to the latest offset.
  • None: throws exception to the consumer.

The default value is latest.

Isolation Level No This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:
  • read_uncommitted
  • read_committted
Properties Yes The properties name and value.
Note:
  • All additional consumer properties can be configured here. For example, to set max poll records value, set property name as max.poll.records and its value.
  • Module properties of only type string is supported.
Interceptors None

The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are executed in the order in which you specify them. The order of execution can be managed using and icons.

Note: To add user-defined properties to the interceptor class, you must specify those property in the Properties field.

Conversation

You can initiate the conversation here. Click the Add New Conversation button to initiate multiple conversations.

Output

The following table describes the fields on the Output tab.

Field Type Description
KafkaReceiverMessageOutput complex The complete output for the Receive Message activity.
topic string The topic name.
Partition number The sequence ID of the partition.
offset number The sequence ID assigned to each record within the partition.
Key String Specified key of the incoming record.
Message String Message received through Kafka.
Headers
  • Key
  • Value
String Headers received through Kafka
  • Key received through Kafka
  • Value received through Kafka