Kafka Get Messages Activity

The Kafka Get Messages activity consumes messages from Kafka Topics. It returns available messages based on specified poll iterations and max poll records configuration. The activity also allows you to seek partition offset to the beginning, LastOffset, end, or custom offset value.

General

On the General tab, specify the required parameters before using this activity. The General tab has the following fields:

Field Literal Value/ Process Property/ Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource for communicating with a Kafka server instance.
Group ID Yes The group ID for the consumer group.
Topic Names Yes The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and these topic names can be separated using a semicolon (;).

For example, topicName1;topicName2

Assign Custom Partition None This is a checkbox to select if the Partition ID needs to be entered. You can select the checkbox to override Kafka's default partition assignment behavior.
Partition IDs Yes Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition IDs are supported using a comma separator (,).

For example,

Single Topic and Single Partition

Partition IDs = 0

Single Topic and Multiple Partitions

  • Partition IDs = 0,1,2 or for specifying range
  • Partition IDs = 0-2

Multiple Topics and Multiple Partitions

For two topics:

  • Partition IDs = 0,1,2;0,4 or
  • Partition IDs = 0-2;0,1,2 or
  • Partition IDs = 1-2;0-3

Note: This field is enabled only if the Assign Custom Partition field is checked.
Use Registry No This checkbox is available when Avro Schema is selected in the Key Deserializer or Value Deserializer field.

This checkbox enables you to use the Avro Schema with the Schema Registry.

Avro Encoding Type Yes This field is available when the Avro Schema is selected in the Key Deserializer or Value Deserializer field and the Use Registry checkbox is not enabled.

Three types of encoding are available in the dropdown:

  • JSON Encoding
  • Binary Encoding
  • Single Object Encoding

For more information on the Avro Encoding types, please refer to Avro documentation.

Key Deserializer No Class for the key that implements the serializer interface.
Key Reader Avro Schema   This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file that contains the avro schema to be used to read the message. Select the file through the resource picker.

Key Writer Avro Schema  

This field is available when Avro Schema is selected in the Key Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file that contains the avro schema used to publish the message. Select the file through the resource picker.

Value Deserializer No Value for the serializer interface.
Value Reader Avro Schema File Yes This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker provided.

Value Writer Avro Schema File Yes

This field is available when Avro Schema is selected in the Value Deserializer field and the Use Registry checkbox is not enabled.

Specifies the path to the .avsc file which contains the avro schema used to publish the message. Select the file through the resource picker provided.

Poll Iterations Yes The number of times Kafka Get Message activity executes the poll() method.

The default value is 3.

Fetch Timeout (msec) Yes The time in milliseconds spent waiting in the poll if data is not available in the buffer. Used as an input to the poll() method.

The default value is 10000.

Max Poll Records Yes

Maximum number of poll records returned in a single poll() operation.

The default value is 500.

Description

On the Description tab, you can enter a short description of the activity.

Field Literal Value/ Process Property/ Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the Advanced tab of the Get Messages activity.

Field Literal Value/ Process Property/ Module Property? Description
Manual Commit None When selected, the offsets of the consumed messages are committed during confirm activity execution using commitSync().

When unselected, the offsets of the consumed messages are committed using commitSync() post-execution of all poll iterations.

Note: Since the number of poll iterations and auto.commit.interval.ms are major factors for auto-commit to work, enable.auto.commit is set to false by default.

Continue on Serialization Error Yes When selected the activity will ignore serialization errors and continue processing further messages. The error information will be available in the ErrorInfo output section.
AutoOffset Reset Yes This is required and selected when there is no initial offset or the offset is out of range.
  • Earliest: resets the offset to the earliest offset.
  • Latest: resets the offset to the latest offset.
  • None: throws an exception to the consumer.

The default value is the earliest.

Isolation Level No This field lets you control how to consume messages written transactionally. The Isolation Levels are of two types:

  • read_uncommitted
  • read_committted

Properties Yes The properties name and value.
Note:
  • All additional consumer properties can be configured here. For example, to set the max poll records value, set the property name as max.poll.records and its value.
  • Module properties of only type string are supported.
Interceptors None

The Interceptor class intercepts the received messages that are published to a topic or channel before being consumed. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are executed in the order in which you specify them. The order of execution can be managed using and icons.

Note: To add user-defined properties to the interceptor class, you must specify that property in the Properties field.

Input

The following table describes the fields on the Input tab of the Get Messages activity.

Field Data Type Description
ConsumerConfig
TopicNames String The topic name where the Kafka cluster stores the stream of the record. Multiple topic names are supported and these topic names can be separated using a semicolon (;).

For example, topicName1;topicName2

Partition IDs Number Sequence ID or range of the partition to which the message is received. Default is 0. Multiple partition IDs are supported using a comma separator (,).

For example,

Single Topic and Single Partition

Partition IDs = 0

Single Topic and Multiple Partitions

  • Partition IDs = 0,1,2 or for specifying range
  • Partition IDs = 0-2

Multiple Topics and Multiple Partitions

For two topics:

  • Partition IDs = 0,1,2;0,4 or
  • Partition IDs = 0-2;0,1,2 or
  • Partition IDs = 1-2;0-3

Note: This field is enabled only if Assign Custom Partition field is checked.
SeekOffset  
Topic String Provide the name of a topic.
Note: Topic can be empty when SeekTo is set to Beginning, LastOffset, or End and seek operation is applied on all assigned (including multiple topics) partitions
Partition ID String Provide a partition ID.
Note: Partition ID is mandatory when SeekTo is set to Offset. Partition ID can be empty when SeekTo is set to Beginning, LastOffset, or End and seek operation is applied on all assigned partitions, otherwise seek is applied to specified partition.
seekTo String Provide seek operation to perform.

Valid values are - Beginning, Offset, End and LastOffset.

Note: Use the LastOffset value to fetch the last record present on a topic.
offset Number Provide an offset number. Used when SeekTo is set to Offset.
Additional properties
  • Key
  • Value
  • String
  • String
  • Provide a key for additional properties.
  • Provide value for the specified key in additional properties.

Output

The following table describes the fields on the Output tab.

Field Type Description
KafkaGetMessageOutput complex The complete output for the GetMessage activity.
Records complex Set of Kafka Messages returned by Kafka Get Messages activity.
topic string The topic name.
Partition number The partition ID.
offset number The sequence ID assigned to each record within the partition.
Key String Specified key of the incoming record.
Message String Message received through Kafka.
Headers
  • Key
  • Value
String Headers received through Kafka
  • Key received through Kafka
  • Value received through Kafka
errorInfo
  • keyErrorStackTrace
  • keyErrorPayload
  • valueErrorStackTrace
  • valueErrorPayload
Complex Error Information. Available when Continue On Error is enabled.
  • Avro Key serialization error stack trace
  • Message payload
  • Avro Value serialization error stack trace
  • Message payload

Fault

The Fault tab has the following exceptions:

  • KafkaPluginException

Each exception has the following fields:

Field Type Description
msg string The error message description returned by the plug-in.
msgCode string The error code returned by the plug-in.