Kafka Send Message Activity

The Kafka Send Message activity is used to send or publish messages to consumers through Kafka brokers.

Kafka transactions can be applied to Kafka Send Message activity. To apply transactions:
  1. In the application process, right-click and create a group named local transactions.
  2. Add the Kafka Send Message activity inside the newly created local transaction group.
  3. On the General tab of the local transaction, select Transaction Transport as Kafka.
  4. Add one or more Kafka Send Message activities which participates in the transaction inside Local Transaction group. Configure Transaction ID and ensure Enable Idempotence is selected.
    Note: The Kafka Send Message activities (inside the transaction group) which are configured with the same shared resource and the Transaction ID participates in a transaction identified by the provided Transaction ID. In such cases, the Kafka producer instance is initialized in the first Kafka Send Message activity within the transaction group and the same producer instance is used by subsequent Kafka Send Message activities. The transaction gets committed or aborted as execution progress towards the end of transaction group.

    Ensure that multiple Kafka Send Message activities participating in the same transaction should be configured with identical configuration. For example, if the first Kafka Send activity is using a string value serializer, then all subsequent activities should be configured with the string value serializer. However, partition or topic name can be provided differently for each activity.

General

On the General tab, you can specify the required parameters before you use this activity. The General tab contains the following fields:

Field Literal Value/Module Property? Description
Name None The name to be displayed as the label for the activity in the process.
Kafka Connection Yes The Kafka connection resource provides the connection details to communicate with a Kafka server Instance.
Topic Name Yes Provide the topic name where the Kafka cluster stores streams of records.
Assign Custom Partition None This is a check box to select if Partition ID field has to be entered. You can select the check box to override Kafka default partition assignment behavior.
Partition ID Yes Sequence ID of the custom partition to which the message is sent. The default value is 0.
Note: This field is enabled only if Assign Custom Partition is checked.
Use Registry No This check box is available when Avro Schema is selected in the Key Serializer or Value Serializer field.

This check box enables you to use the Avro Schema with the Schema Registry.

Note: If a registry and any subject name strategy except TopicName strategy is used, provide the strategy name through the Additional properties.
Avro Encoding Type Yes This field is available when the Avro Schema is selected in the Key Serializer or Value Serializer field and the Use Registry check box is not enabled.
Three types of encoding are available in the dropdown:
  • JSON Encoding
  • Binary Encoding
  • Single Object Encoding

For more information on the Avro Encoding types, please refer to Avro documentation.

Key Serializer No This is a serializer class for key that implements the serializer interface.
Key Avro Schema Name Yes This field is available when Avro Schema is selected in the Key Serializer field and the Use Registry check box is enabled.

Provide the name and version of the avro schema to be used in the following format.

<<subject-name>> : <<version>>

Select Avro Schema No This button is available when Use Registry check box is enabled. This button selects the Avro Schema that is used for the serialization of the data.

On selecting the Select Avro Schema a wizard is displayed from where the user can select the stored schema and the version.

From the wizard, you can view and select the Subject and Version of the Avro schema and the selected avro schema and the sample json payload for that schema.

The Sample JSON payload can be used to create an XML Schema file using BW Schemas tool XML Schema File from JSON Payload. This XSD file can be used to map and create the JSON string input.

Key Avro Schema File Yes This field is available when Avro Schema is selected in the Key Serializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker.

Sample Avro Data No This button is available when Use Registry check box is not enabled. On selecting the Sample Avro Data, a wizard is displayed. The wizard displays the avro schema and sample JSON payload for that schema.

The sample JSON payload can be used to create XML schema file using BW schema tool, XML Schema File from JSON Payload. This XSD file is used to map and create the JSON string input.

Value Serializer No Value for the serializer interface.
Value Avro Schema Name Yes This field is available when Avro Schema is selected in the Value Serializer field and the Use Registry check box is enabled. Provide the name and version of the avro schema to be used in the following format.

<<subject-name>> : <<version>>

Select Avro Schema No This button is available when Use Registry check box is enabled. This button selects the Avro Schema that is used for the serialization of the data.

On selecting the Select Avro Schema a wizard is displayed from where the user can select the stored schema and the version.

From the wizard, you can view and select the Subject and Version of the Avro schema and the selected avro schema and the sample json payload for that schema.

The Sample JSON payload can be used to create an XML Schema file using BW Schemas tool XML Schema File from JSON Payload. This XSD file can be used to map and create the JSON string input.

Value Avro Schema File Yes This field is available when the Avro Schema is selected in the Value Serializer field and the Use Registry check box is not enabled.

Specifies the path to the .avsc file which contains the avro schema to be used. Select the file through the resource picker.

Sample Avro Data No This button is available when Use Registry check box is not enabled. On selecting the Sample Avro Data, a wizard is displayed. The wizard displays the avro schema and sample JSON payload for that schema.

The sample JSON payload can be used to create XML schema file using BW schema tool, XML Schema File from JSON Payload. This XSD file is used to map and create the JSON string input.

Acks Yes When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0, 1 or all replicas. This controls the durability of records that are sent. This configuration controls the criteria under which producer requests are considered complete.
  • acks=0. The producer does not wait for any acknowledgment from the Kafka server. The sent record is added to buffer and considered sent. In this scenario, no guarantee can be made that server has received the record, and retries do not take any effect.
  • acks=1. The record is added to the log. In this case if the leader has written the record to its local log, it responds without waiting for follower’s acknowledgment.
  • acks=all. An acknowledgment of the record is sent if the data is committed by all the in-sync replicas. This is the strongest available guarantee.
Buffer Memory Yes This is total amount of memory available to the producer to buffer records waiting to be sent to the server.

The default value is 33554432.

Compression Type Yes If enabled, data is compressed by the producer, written in compressed format on the server, and decompressed by the consumer. The default is none.
Retries Yes Specifies the number of retries that are made by a client to resend any record in event of a transient error.

Description

On the Description tab, you can enter a short description for the Send Message activity.

Field Literal Value/Process Property/Module Property? Description
Description None A description of the activity.

Advanced

The following table describes the fields on the Advanced tab of the Send Message activity.

Field Literal Value/ Module Property? Description
Batch Size None Records are batched together by producer whenever multiple records are sent to the same partition. No attempt is made to batch the records larger than this size.

The default value is 16384.

Client ID Yes A String client ID is passed to the server while making the requests to track the source of the requests.
Linger Yes You can set linger.ms to something greater than 0 to instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records arrive to fill up the same batch. The default behavior is to send messages immediately even if there is additional space in the buffer.
Max Request Size Yes Limits the number of record batches the producer sends in a single request to avoid sending huge requests.

The default value is 1048576.

Override Transaction Behavior No When this check box is selected, the Send Message activity does not participate in the transaction even if it is a part of the transaction group.

The default value is a boolean value.

Transactional ID Yes This field is available only when the Override Transaction Behavior check box is unselected.

It is a unique String ID which defines the Kafka transaction. For more information on transactions, please refer to the Kafka documentation.

Enable Idempotence No This field enables exactly once semantics for the Kafka producer. In the producer activity when the value is set to true, exactly one copy of each of the message is written in the stream. If the value is set to false, the producer retries to connect because of broker failures and so on, that may write duplicate copies of the retried message in the stream.

The default value is false.

Properties Yes You can set the properties name and value for the producer.
Note:
  • All additional producer properties can be configured here. For example, to set buffer memory value, set property name as buffer.memory and its value.
  • Module properties of only type string is supported.
Interceptors No

The Interceptor class intercepts the sent message to modify or read it before publishing it to a topic or channel. You can add the interceptor classes using icon and delete the classes using icon. The interceptor classes are executed in the order in which you specify them. The order of execution can be managed using and icons.

Note: To add user-defined properties to the interceptor class, you must specify those property in the Properties field.

Input

The following table describes the fields on the Input tab of the Send Message activity.

Field Data Type Description
ProducerConfig (All fields in this section are optional.)
Topic String Topic name where Kafka cluster stores streams of records.
Partition ID Number Sequence ID of the partition to which the message is sent. Sequence ID of the partition can be entered here or in the Partition ID field under General tab.
Note: The Assign Custom Partition check box under General tab must be selected to enter a value here.
Batch Size Number Batch size detail to batch the records sent to same partition.
Client ID String Client ID passed to the server while making the request.
Linger.ms Number Time in millisecond to add artificial delay while sending the message from producer.
Max Request Size Number Maximum size of a request in bytes. Use it to limit the number of record batches the producer sends in a single request to avoid sending huge requests.
Additional properties
  • Key
  • Value
  • String
  • String
  • Provide key for additional properties.
  • Provide value for specified key in additional properties.
schemaRegistryConfig
KeySubjectNameVersion String Provide the name and version of the avro schema to be used in the following format:

<<subject-name>> : <<version>>

This field is available when Avro Schema is selected and the Use Registry check box is enabled.
subjectNameVersion String Provide the name and version of the avro schema to be used in the following format:

<<subject-name>> : <<version>>

This field is available when Avro Schema is selected and the Use Registry check box is enabled.
Messages
  • Key
  • Message
  • Headers
    • Key
    • Value
String
  • Optional. Key for the message to be sent.
  • Required. The actual message as value for the specified key.
  • Optional. User defined custom headers to be added in the outgoing message.
    • Provide key for the headers to be sent.
    • Provide value for specified key in headers.
      Note: The value is provided in base64binary
Note: Use -Dcom.tibco.plugin.kafka.new.producer JVM property to create a new producer for every job:
  • If the property is not set or set to false, only one KafkaProducer is created in init() method when the application starts and the producer gets closed when the application stops.
  • In the above case, topic from the Input tab is overridden. All other producer properties are derived from the General and Advanced tabs and the Input tab configurations are ignored.
  • If the property is set to true, then properties specified in the Input tab override the properties in the General tab and Advanced tab, and a new KafkaProducer is created for every job.

Output

The following table describes the fields on the Output tab of the Send Message activity.

Output Item Data Type Description
KafkaSendMessageOutput complex The complete output of the Send Message activity.
result complex Information about the content of the sent and failed messages.
status string Status of the message sent by a producer.
SendSuccess complex Information about the content of the sent message.
topic string The topic name for publishing the message.
offset number The sequence ID number assigned to each record within a partition.
Partition number The sequence ID of the partition to which a record is sent within a topic.
SendFailed complex Information about the content of the failed message.
errorCode string Displays the error code.
errorMessage string Displays the error message.

Fault

The Fault tab has the following exceptions:

  • KafkaCreatedProducerException
  • KafkaPluginException

Each exception has the following fields:

Field Type Description
msg string The error message description returned by the plug-in.
msgCode string The error code returned by the plug-in.