Kafka Get Messages Activity

The Kafka Get Messages activity consumes messages from Kafka Topics. It returns available messages based on specified poll iterations and max poll records configuration. The activity also allows you to seek partition offset to beginning, end or custom offset value.

General

On the General tab, specify the required parameters before using this activity. The General tab has the following fields:

Field Literal Value/ Process Property/ Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource for communicating with a Kafka server instance.
Group ID Yes The group ID for the consumer group.
Topic Names Yes The topic name where Kafka cluster stores streams of record. Multiple topic names are supported and these topic names can be separated using a semi colon (;).

For example, topicName1;topicName2

Assign Custom Partition None This is a check box to select if Partition ID needs to be entered. You can select the check box to override Kafka's default partition assignment behavior.
Partition IDs Yes Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition ID's are supported using a comma separator (,).

For example,

Single Topic and Single Partition

Partition IDs = 0

Single Topic and Multiple Partitions
  • Partition IDs = 0,1,2 or for specifying range
  • Partition IDs = 0-2

Multiple Topics and Multiple Partitions

For two topics:
  • Partition IDs = 0,1,2;0,4 or
  • Partition IDs = 0-2;0,1,2 or
  • Partition IDs = 1-2;0-3
Note: This field is enabled only if Assign Custom Partition field is checked.
Use Registry No This check box is available when Avro Schema is selected in the Key Deserializer or Value Deserializer field.

This check box enables you to use the Avro Schema with the Schema Registry.

Avro Encoding Type Yes This field is available when the Avro Schema is selected in the Key Deserializer or Value Deserializer field and the Use Registry check box is not enabled.
Three types of encoding are available in the dropdown:
  • JSON Encoding
  • Binary Encoding
  • Single Object Encoding

For more information on the Avro Encoding types, please refer to Avro documentation.

Key Deserializer No Class for the key that implements the serializer interface.
Key Avro Schema File Yes This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker.

Value Deserializer No Value for the serializer interface.
Value Avro Schema File Yes This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker provided.

Poll Iterations Yes The number of times Kafka Get Message activity executes poll() method.

The default value is 3.

Fetch Timeout (msec) Yes The time in milliseconds spent waiting in poll if data is not available in the buffer. Used as an input to the poll() method.

The default value is 10000.

Max Poll Records Yes

Maximum number of poll records returned in a single poll() operation.

The default value is 500.

Description

On the Description tab, you can enter a short description for the activity.

Field Literal Value/ Process Property/ Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the Advanced tab of the Get Messages activity.

Field Literal Value/ Process Property/ Module Property? Description
Manual Commit None When selected, the offsets of the consumed messages are committed during confirm activity execution using commitSync().
When unselected, the offset of the consumed messages are committed using commitSync() post execution of all poll iterations.
Note: Since number of poll iterations and auto.commit.interval.ms are major factors for auto commit to work, enable.auto.commit is set to false by default.
AutoOffset Reset Yes This is required and selected when there is no initial offset or the offset is out of range.
  • Earliest: resets the offset to the earliest offset.
  • Latest: resets the offset to the latest offset.
  • None: throws exception to the consumer.

The default value is earliest.

Isolation Level No This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:
  • read_uncommitted
  • read_committted
Properties Yes The properties name and value.
Note:
  • All additional consumer properties can be configured here. For example, to set max poll records value, set property name as max.poll.records and its value.
  • Module properties of only type string is supported.
Interceptors None

The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are executed in the order in which you specify them. The order of execution can be managed using and icons.

Note: To add user-defined properties to the interceptor class, you must specify those property in the Properties field.

Input

The following table describes the fields on the Input tab of the Get Messages activity.

Field Data Type Description
ConsumerConfig
TopicNames String The topic name where Kafka cluster stores streams of record. Multiple topic names are supported and these topic names can be separated using a semi colon (;).

For example, topicName1;topicName2

Partition IDs Number Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition ID's are supported using a comma separator (,).

For example,

Single Topic and Single Partition

Partition IDs = 0

Single Topic and Multiple Partitions
  • Partition IDs = 0,1,2 or for specifying range
  • Partition IDs = 0-2

Multiple Topics and Multiple Partitions

For two topics:
  • Partition IDs = 0,1,2;0,4 or
  • Partition IDs = 0-2;0,1,2 or
  • Partition IDs = 1-2;0-3
Note: This field is enabled only if Assign Custom Partition field is checked.
SeekOffset
Topic String Provide name of a topic.
Note: Topic can be empty when SeekTo is set to Beginning or End and seek operation is applied on all assigned (including multiple topics) partitions
Partition ID String Provide a partition ID.
Note: Partition ID is mandatory when SeekTo is set to Offset. Partition ID can be empty when SeekTo is set to Beginning or End and seek operation is applied on all assigned partitions, otherwise seek is applied to specified partition.
seekTo String Provide seek operation to perform.

Valid values are - Beginning, Offset, and End.

offset Number Provide an offset number. Used when SeekTo is set to Offset.
Additional properties
  • Key
  • Value
  • String
  • String
  • Provide key for additional properties.
  • Provide value for specified key in additional properties.

Output

The following table describes the fields on the Output tab.

Field Type Description
KafkaGetMessageOutput complex The complete output for the GetMessage activity.
Records complex Set of Kafka Messages returned by Kafka Get Messages activity.
topic string The topic name.
Partition number The partition ID.
offset number The sequence ID assigned to each record within the partition.
Key String Specified key of the incoming record.
Message String Message received through Kafka.
Headers
  • Key
  • Value
String Headers received through Kafka
  • Key received through Kafka
  • Value received through Kafka

Fault

The Fault tab has the following exceptions:

  • KafkaPluginException

Each exception has the following fields:

Field Type Description
msg string The error message description returned by the plug-in.
msgCode string The error code returned by the plug-in.