Kafka Receive Message Activity
Kafka Receive Message activity is an event source activity that can be configured as a process starter in any TIBCO BusinessWorks process. It starts the process execution on receiving a Kafka message event.
General
On the General tab, specify the required parameters before using this activity. The General tab has the following fields:
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Name | None | The name to be displayed as the label for the activity in the process. |
Kafka Connection | Yes | The Kafka connection resource for communicating with a Kafka server instance. |
Group ID | Yes | The group ID for the consumer group. |
Topic Names | Yes | The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and these can be separated by a semicolon (;) |
Assign Custom Partition | None | This is a check box to select if aPartition ID needs to be entered. You can select the check box to override Kafka's default partition assignment behavior. |
Partition IDs | Yes | Sequence ID or range of the partition to which the message is sent. Default is 0. Multiple partition IDs are supported and these can be separated using a comma (,)
For example, Single Topic and Single Partition: Partition IDs = 0 Single Topic and Multiple Partitions: Partition IDs = 0,1,2 or for specifying range Partition IDs = 0-2 Multiple Topics and Multiple Partitions: For two topics - Partition IDs = 0,1,2;0,4 or Partition IDs = 0-2;0,1,2 or Partition IDs = 1-2;0-3 Note:
|
Consumer Count | Yes | The Consumer Count specifies the number of KafkaConsumer instances started by the activity. The default value is 0 which will create and start KafkaConsumer instances equal to the number of partitions in the topic. The maximum value allowed for Consumer Count is equal to the number of partitions in the topic. |
Use Registry | No | This check box is available when
Avro Schema is selected in the
Key Deserializer or
Value Deserializer field.
This check box enables you to use the Avro Schema with the Schema Registry. |
Avro Encoding Type | Yes | This field is available when the Avro Schema is selected in the
Key Deserializer or
Value Deserializer field and the
Use Registry check box is not enabled.
Three types of encoding are available in the dropdown:
For more information on the Avro Encoding types, please refer to Avro documentation. |
Key Deserializer | No | Class for the key that implements the serializer interface. |
Key Reader Avro Schema | Yes | This field is available when
Avro Schema is selected in the
Key Deserializer field and the
Use Registry check box is not enabled.
Specifies the path to the .avsc file that contains the Avro schema to be used to read the message. Select the file through the resource picker. |
Key Writer Avro Schema | Yes | This field is available when
Avro Schema is selected in the
Key Deserializer field and the
Use Registry check box is not enabled.
Specifies the path to the .avsc file that contains the Avro schema used to publish the message. Select the file through the resource picker. |
Value Deserializer | No | Value for the serializer interface. |
Value Reader Avro Schema | Yes | This field is available when
Avro Schema is selected in the
Value Deserializer field and the
Use Registry check box is not enabled.
Specifies the path to the .avsc file which contains the Avro schema to be used to read the message. Select the file through the resource picker provided. |
Value Writer Avro Schema | Yes |
This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry check box is not enabled. Specifies the path to the .avsc file which contains the Avro schema used to publish the message. Select the file through the resource picker provided. |
Fetch Timeout | Yes | Specifies the maximum time in milliseconds to get the metadata about the topic before a timeout occurs.
The default value is 1000. |
Fetch Min Bytes | Yes | The minimum amount of data that the server would send on receiving a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. |
Fetch Max Wait | Yes | The maximum amount of time that the server would block before answering a fetch request if there is not sufficient data to satisfy the requirement given by
fetch.min.bytes immediately.The default value is 500. |
Heartbeat Interval | Yes | Time in milliseconds between heartbeats to the consumer. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing and information when consumers join or leave a group.
The default value is 3000. |
Session Timeout | Yes | The consumer sends periodic heartbeats to the server indicating about its liveness to the broker. If no heartbeats are received by a broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance.
The default value is 30000. |
Description
On the Description tab, you can enter a short description of the activity.
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Description | None | A description of the activity. |
Advanced
The following table describes the Advanced tab of the Receive Message activity.
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Sequence Key | None | XPath expression that specifies which processes must run in sequence. Process instances with sequencing keys that evaluate to the same value are executed sequentially in the sequence the process instance was created. |
Custom Job Id | None | This field can contain an XPath expression that specifies a custom ID for the process instance. |
Enable Auto Commit | None |
Note: Use the Confirm activity from the
General Activities palette for manual commit.
|
Enable Batch Output | None | Select this check box to enable batch output mode. The activity returns all the records received during the Kafka consumer poll operation.
Note: When auto-commit is disabled, the activity commits offset for the entire batch during confirm activity execution.
|
Continue on Serialization Error | None | When selected the activity ignores serialization errors and continues processing further messages. The error information is available in the ErrorInfo output section. |
AutoCommit Interval | Yes | Interval in milliseconds at which the consumer offsets are auto-committed to Kafka when the auto-commit mode is enabled.
The default value is 5000. |
AutoOffset Reset | Yes | This is required and selected when there is no initial offset or the offset is out of range.
The default value is the latest. |
Isolation Level | No | This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:
|
Properties | Yes | The properties name and value.
Note:
|
Interceptors | None |
The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using
Note: To add user-defined properties to the interceptor class, you must specify that property in the
Properties field.
|
Conversation
You can initiate the conversation here. Click the
Add New Conversation icon to initiate multiple conversations.
Output
The following table describes the fields on the Output tab.
Field | Type | Description |
---|---|---|
KafkaReceiverMessageOutput | complex | The complete output for the Receive Message activity. |
Records | complex | A Set of Kafka messages. |
topic | string | The topic name. |
Partition | number | The sequence ID of the partition. |
offset | number | The sequence ID assigned to each record within the partition. |
Key | String | Specified key of the incoming record. |
Message | String | Message received through Kafka. |
Headers
|
String | Headers received through Kafka
|
errorInfo
|
Complex | Error Information. Available when
Continue On Error is enabled.
|
Fault
The Fault tab has the following exceptions:
- KafkaEventSourceFaultException
Each exception has the following fields:
Field | Type | Description |
---|---|---|
msg | string | The error message description returned by the plug-in. |
msgCode | string | The error code returned by the plug-in. |