Kafka Receive Message Activity

Kafka Receive Message activity is an event source activity. You can configure this activity as a process starter in any TIBCO BusinessWorks process. It starts the execution process on receiving a Kafka message event.

Note: Configure the com.tibco.bw.kafka.receiver.flowLimitControl system property in milliseconds to resolve errors in the Confirm activity when using ManualAcknowledgement and FlowLimit for the Kafka Receive Message activity.

General

On the General tab, specify the required parameters before using this activity. The General tab has the following fields:

Field Literal Value/ Process Property/ Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource for communicating with a Kafka server instance.
Group ID Yes

The group ID for the consumer group.

Note: The valid characters for Group ID are ASCII Alphanumeric characters, ., _, -.
Topic Names Yes The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and you can separate them by a semicolon (;)
Assign Custom Partition None This is a checkbox to select if a Partition ID needs to be entered. You can select the checkbox to override Kafka's default partition assignment behavior.
Partition IDs Yes Sequence ID or the range of the partition to which the message is sent. Default is 0. Multiple partition IDs are supported and you can separate them using a comma (,)

For example,

Single Topic and Single Partition:

Partition IDs = 0

Single Topic and Multiple Partitions:

Partition IDs = 0, 1, 2 or for specifying range

Partition IDs = 0-2

Multiple Topics and Multiple Partitions:

For two topics -

Partition IDs = 0, 1, 2; 0, 4 or

Partition IDs = 0-2; 0, 1, 2 or

Partition IDs = 1-2; 0-3

Note:
  • This field is enabled only if the Assign Custom Partition field is checked.
  • For Projects created on Kafka Plug-in version 6.1.0 or earlier, if the PartitionID field in Kafka Receive Message Activity is configured by using a module or process property, the data type of the property must be changed from Integer to String.

Consumer Count Yes The Consumer Count specifies the number of Kafka Consumer instances started by the activity.The default value is 0 which will create and start KafkaConsumer instances equal to the number of partitions in the topic. The maximum value allowed for the Consumer Count is equal to the number of partitions in the topic.
Use Registry No This checkbox is available when Avro Schema is selected in the Key Deserializer or Value Deserializer field.

This checkbox enables you to use the Avro Schema with the Schema Registry.

Avro Encoding Type Yes This field is available when the Avro Schema is selected in the Key Deserializer or Value Deserializer field and the Use Registry checkbox is not enabled.

Three types of encoding are available in the dropdown:

  • JSON Encoding
  • Binary Encoding
  • Single Object Encoding

For more information on the Avro Encoding types, please refer to Avro documentation.

Key Deserializer No

Class for the key that implements the serializer interface.

Key Reader Avro Schema Yes This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file that contains the Avro schema to be used to read the message. Select the file through the resource picker.

Key Writer Avro Schema Yes This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file that contains the Avro schema used to publish the message. Select the file through the resource picker.

Value Deserializer No Value for the serializer interface.
Value Reader Avro Schema Yes This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file, which contains the Avro schema to be used to read the message. Select the file through the resource picker provided.

Value Writer Avro Schema Yes

This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file, which contains the Avro schema used to publish the message. Select the file through the resource picker provided.

Fetch Timeout Yes Specifies the maximum time in milliseconds to get the metadata about the topic before a timeout occurs.

The default value is 1000.

Fetch Min Bytes Yes The minimum amount of data that the server would send on receiving a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.
Fetch Max Wait Yes The maximum amount of time that the server would block before answering a fetch request if there is not sufficient data to satisfy the requirement given by fetch.min.bytes immediately.

The default value is 500.

Heartbeat Interval Yes Time in milliseconds between heartbeats to the consumer. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing and information when consumers join or leave a group.

The default value is 3000.

Session Timeout Yes The consumer sends periodic heartbeats to the server indicating its liveness to the broker. If no heartbeats are received by a broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance.

The default value is 30000.

Max Poll Records Yes

Maximum number of poll records returned in a single poll() operation.

The default value is 500.

Note: If the TIBCO ActiveMatrix BusinessWorks™ flowlimit property is used, it must be equal to the value of the Max Poll Records field.

Description

On the Description tab, you can enter a short description of the activity.

Field Literal Value/ Process Property/ Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the Advanced tab of the Receive Message activity.

Field Literal Value/ Process Property/ Module Property? Description
Sequence Key None XPath expression that specifies which processes must run in sequence. Process instances with sequencing keys that evaluate to the same value are run sequentially in the sequence the process instance was created.
Custom Job Id None This field can contain an XPath expression that specifies a custom ID for the process instance.
Enable Auto Commit None
  • Select this checkbox to auto-commit the message record.
  • Deselect the checkbox to enable Manual Commit for the message record.
Note: Use the Confirm activity from the General Activities palette for manual commit.

If an error occurs at the Confirm activity, the default behavior of the Receiver is to move forward to receive any new incoming messages.

However, if you want to reprocess the failed commit messages, set the system property com.tibco.bw.kafka.receiver.RetryFailedCommit to true.

Enable Batch Output None Select this checkbox to enable batch output mode. The activity returns all the records received during the Kafka consumer poll operation.
Note: When auto-commit is disabled, the activity commits offset for the entire batch during confirm activity execution.
Continue on Serialization Error None When selected, the activity ignores serialization errors and continues processing further messages. The error information is available in the ErrorInfo output section.
AutoCommit Interval Yes Interval in milliseconds at which the consumer offsets are auto-committed to Kafka when the auto-commit mode is enabled.

The default value is 5000.

AutoOffset Reset Yes This is required and selected when there is no initial offset or the offset is out of range.
  • Earliest: resets the offset to the earliest offset.
  • Latest: resets the offset to the latest offset.
  • None: throws an exception to the consumer.

The default value is the latest.

Isolation Level No This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:

  • read_uncommitted
  • read_committted

Properties Yes The properties name and value.
Note:
  • You can configure additional consumer properties here. For example, to set the max poll records value, set the property name as max.poll.records and its value.
  • Module properties of only type string are supported.
Interceptors None

The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are run in the order in which you specify them. The order of execution can be managed using the and icons.

Note: To add user-defined properties to the interceptor class, you must specify that property in the Properties field.

Conversation

You can initiate the conversation here. Click the Add New Conversation icon to initiate multiple conversations.

Output

The following table describes the fields on the Output tab.

Field Type Description
KafkaReceiverMessageOutput complex The complete output for the Receive Message activity.
Records complex A set of Kafka messages.
topic string The topic name.
Partition number The sequence ID of the partition.
offset number The sequence ID assigned to each record within the partition.
Key String Specified key of the incoming record.
Message String Message received through Kafka.
Headers
  • Key
  • Value
String Headers received through Kafka
  • Key received through Kafka
  • Value received through Kafka
errorInfo
  • keyErrorStackTrace
  • keyErrorPayload
  • valueErrorStackTrace
  • valueErrorPayload
Complex Error Information. Available when Continue On Error is enabled.
  • Avro Key serialization error stack trace
  • Message payload
  • Avro Value serialization error stack trace
  • Message payload

Fault

The Fault tab has the following exception:

  • KafkaEventSourceFaultException

Each exception has the following fields:

Field Type Description
msg string The error message description returned by the plug-in.
msgCode string The error code returned by the plug-in.