Kafka Get Messages Activity
The Kafka Get Messages activity consumes messages from Kafka Topics. It returns available messages based on specified poll iterations and max poll records configuration. The activity also allows you to seek partition offset to the beginning, LastOffset, end, or custom offset value.
General
On the General tab, specify the required parameters before using this activity. The General tab has the following fields:
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Name | None | The name to be displayed as the label for the activity in the process. |
Kafka Connection | Yes | The Kafka connection resource for communicating with a Kafka server instance. |
Group ID | Yes | The group ID for the consumer group. |
Topic Names | Yes | The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and these topic names can be separated using a semicolon (;).
For example, topicName1;topicName2 |
Assign Custom Partition | None | This is a checkbox to select if the Partition ID needs to be entered. You can select the checkbox to override Kafka's default partition assignment behavior. |
Partition IDs | Yes | Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition IDs are supported using a comma separator (,).
For example, Single Topic and Single Partition Partition IDs = 0 Single Topic and Multiple Partitions
Multiple Topics and Multiple Partitions For two topics:
Note: This field is enabled only if the
Assign Custom Partition field is checked.
|
Use Registry | No | This checkbox is available when
Avro Schema is selected in the
Key Deserializer or
Value Deserializer field.
This checkbox enables you to use the Avro Schema with the Schema Registry. |
Avro Encoding Type | Yes | This field is available when the Avro Schema is selected in the
Key Deserializer or
Value Deserializer field and the
Use Registry checkbox is not enabled.
Three types of encoding are available in the dropdown:
For more information on the Avro Encoding types, please refer to Avro documentation. |
Key Deserializer | No | Class for the key that implements the serializer interface. |
Key Reader Avro Schema | This field is available when
Avro Schema is selected in the
Key Deserializer field and the
Use Registry checkbox is not enabled.
Specifies the path to the .avsc file that contains the avro schema to be used to read the message. Select the file through the resource picker. |
|
Key Writer Avro Schema |
This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry checkbox is not enabled. Specifies the path to the .avsc file that contains the avro schema used to publish the message. Select the file through the resource picker. |
|
Value Deserializer | No | Value for the serializer interface. |
Value Reader Avro Schema File | Yes | This field is available when
Avro Schema is selected in the
Value Deserializer field and the
Use Registry checkbox is not enabled.
Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker provided. |
Value Writer Avro Schema File | Yes |
This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry checkbox is not enabled. Specifies the path to the .avsc file which contains the avro schema used to publish the message. Select the file through the resource picker provided. |
Poll Iterations | Yes | The number of times Kafka Get Message activity executes the poll() method.
The default value is 3. |
Fetch Timeout (msec) | Yes | The time in milliseconds spent waiting in the poll if data is not available in the buffer. Used as an input to the poll() method.
The default value is 10000. |
Max Poll Records | Yes |
Maximum number of poll records returned in a single poll() operation. The default value is 500. |
Description
On the Description tab, you can enter a short description of the activity.
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Description | None | A description of the activity. |
Advanced
The following table describes the Advanced tab of the Get Messages activity.
Field | Literal Value/ Process Property/ Module Property? | Description |
---|---|---|
Manual Commit | None | When selected, the offsets of the consumed messages are committed during confirm activity execution using commitSync().
When unselected, the offsets of the consumed messages are committed using commitSync() post-execution of all poll iterations. Note: Since the number of poll iterations and
auto.commit.interval.ms are major factors for auto-commit to work,
enable.auto.commit is set to false by default.
|
Continue on Serialization Error | Yes | When selected the activity will ignore serialization errors and continue processing further messages. The error information will be available in the ErrorInfo output section. |
AutoOffset Reset | Yes | This is required and selected when there is no initial offset or the offset is out of range.
The default value is the earliest. |
Isolation Level | No | This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:
|
Properties | Yes | The properties name and value.
Note:
|
Interceptors | None |
The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using
Note: To add user-defined properties to the interceptor class, you must specify that property in the
Properties field.
|
Input
The following table describes the fields on the Input tab of the Get Messages activity.
Field | Data Type | Description |
---|---|---|
ConsumerConfig | ||
TopicNames | String | The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and these topic names can be separated using a semicolon (;).
For example, topicName1;topicName2 |
Partition IDs | Number | Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition IDs are supported using a comma separator (,).
For example, Single Topic and Single Partition Partition IDs = 0 Single Topic and Multiple Partitions
Multiple Topics and Multiple Partitions For two topics:
Note: This field is enabled only if
Assign Custom Partition field is checked.
|
SeekOffset | ||
Topic | String | Provide the name of a topic.
Note: Topic can be empty when SeekTo is set to Beginning, LastOffset, or End and seek operation is applied on all assigned (including multiple topics) partitions
|
Partition ID | String | Provide a partition ID.
Note: Partition ID is mandatory when
SeekTo is set to
Offset. Partition ID can be empty when
SeekTo is set to
Beginning, LastOffset, or
End and seek operation is applied on all
assigned partitions, otherwise seek is applied to specified partition.
|
seekTo | String | Provide seek operation to perform.
Valid values are - Beginning, Offset, End and LastOffset. Note: Use the LastOffset value to fetch the last record present on a topic.
|
offset | Number | Provide an offset number. Used when SeekTo is set to Offset. |
Additional properties | ||
|
|
|
Output
The following table describes the fields on the Output tab.
Field | Type | Description |
---|---|---|
KafkaGetMessageOutput | complex | The complete output for the GetMessage activity. |
Records | complex | Set of Kafka Messages returned by Kafka Get Messages activity. |
topic | string | The topic name. |
Partition | number | The partition ID. |
offset | number | The sequence ID assigned to each record within the partition. |
Key | String | Specified key of the incoming record. |
Message | String | Message received through Kafka. |
Headers
|
String | Headers received through Kafka
|
errorInfo
|
Complex | Error Information. Available when
Continue On Error is enabled.
|
Fault
The Fault tab has the following exceptions:
- KafkaPluginException
Each exception has the following fields:
Field | Type | Description |
---|---|---|
msg | string | The error message description returned by the plug-in. |
msgCode | string | The error code returned by the plug-in. |