Kafka SendMessage Activity

The Kafka SendMessage activity is used to send or publish messages to consumers through Kafka brokers.

General

In the General tab, you can specify the required parameters before you use this activity. The General tab contains the following fields:

Field Literal Value/Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource provides the connection details to communicate with a Kafka server Instance.
Topic Name Yes Provide the topic name where the Kafka cluster stores streams of records.
Assign Custom Partition None This is a check box to select if Partition ID needs to be entered. You can select the check box to override Kafka's default partition assignment behavior.
Partition ID Yes Sequence ID of the custom partition to which the message is sent. The default value is 0.
Note: This field is enabled only if Assign Custom Partition is checked.
Key Serializer Yes This is a serializer class for key that implements the serializer interface.
Value Serializer Yes Value for the serializer interface.
Acks Yes When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0, 1 or all replicas. This controls the durability of records that are sent. This configuration controls the criteria under which producer requests are considered complete.
  • acks=0. The producer does not wait for any acknowledgment from the Kafka server. The sent record is added to buffer and considered sent. In this scenario, no guarantee can be made that server has received the record, and retries do not take any effect.
  • acks=1. The record is added to the log. In this case if the leader has written the record to its local log, it responds without waiting for follower’s acknowledgment.
  • acks=all. An acknowledgment of the record is sent if the data is committed by all the in-sync replicas. This is the strongest available guarantee.
Buffer Memory Yes This is total amount of memory available to the producer to buffer records waiting to be sent to the server.

The default value is 33554432.

Compression Type Yes If enabled, data will be compressed by the producer, written in compressed format on the server, and decompressed by the consumer. The default is none.
Retries Yes Specifies the number of retries that are made by a client to resend any record in event of a transient error.

Description

In the Description tab, you can enter a short description for the activity.

Field Literal Value/Process Property/Module Property? Description
Description None A description of the activity.

Advanced

Specify the Batch Size, Client ID, Linger, Max Request Size, and Properties.

Field Literal Value/ Module Property? Description
Batch Size None Records are batched together by producer whenever multiple records are sent to the same partition. No attempt is made to batch the records larger than this size.

The default value is 16384.

Client ID Yes A String client ID is passed to the server while making the requests to track the source of the requests.
Linger Yes You can set linger.ms to something greater than 0 to instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records arrive to fill up the same batch. The default behavior is to send messages immediately even if there is additional space in the buffer.
Max Request Size Yes Limits the number of record batches the producer sends in a single request to avoid sending huge requests.

The default value is 1048576.

Properties None Provide the properties name and value for the producer.
Note: Kafka plug-in supports Fault Tolerance. In case of any errors, you can use Properties to add the correct name and value.

Input

The following table describes the fields in the Input tab of the SendMessage activity.

Field Data Type Description
ProducerConfig (All fields in this section are optional.)
Topic String Topic name where Kafka cluster stores streams of records.
Partition ID Number Sequence ID of the partition to which the message is sent. Sequence ID of the partition can be entered here or in the Partition ID field under General tab.
Note: The Assign Custom Partition check box under General tab must be selected to enter a value here.
Batch Size Number Batch size detail to batch the records sent to same partition.
Client ID String Client ID passed to the server while making the request.
Linger.ms Number Time in millisecond to add artificial delay while sending the message from producer.
Max Request Size Number Maximum size of a request in bytes. Use it to limit the number of record batches the producer sends in a single request to avoid sending huge requests.
Additional properties
  • Key
  • Value

String

  • Provide key for additional properties.
  • Provide value for specified key in additional properties.
Messages
  • Key
  • Message
String
  • Optional. Key for the message to be sent.
  • Required. The actual message as value for the specified key.

Output

The following table describes the fields in the Output tab of the SendMessage activity.

Output Item Data Type Description
KafkaSendMessageOutput complex The complete output of the SendMessage activity.
result complex Information about the content of the sent and failed messages.
status string Status of the message sent by a producer.
SendSuccess complex Information about the content of the sent message.
topic string The topic name for publishing the message.
offset number The sequence ID number assigned to each record within a partition.
Partition number The sequence ID of the partition to which a record is sent within a topic.
SendFailed complex Information about the content of the failed message.
errorCode string Displays the error code.
errorMessage string Displays the error message.

Fault

The Fault tab has the following exceptions:

  • KafkaCreatedProducerException
  • KafkaPluginException

Each exception has the following fields:

Field Type Description
msg string The error message description returned by the plug-in.
msgCode string The error code returned by the plug-in.