Kafka Send Message Activity
The Kafka Send Message activity is used to send or publish messages to consumers through Kafka brokers.
Kafka transactions can be applied to Kafka Send Message activity. To apply transactions:
- In the application process, right-click and create a group named local transactions.
- Add the Kafka Send Message activity inside the newly created local transaction group.
- On the General tab of the local transaction, select Transaction Transport as Kafka.
- Add one or more Kafka Send Message activities that participate in the transaction inside the Local Transaction group. Configure Transaction ID and ensure Enable Idempotence is selected.
Note: The Kafka Send Message activities (inside the transaction group) that are configured with the same shared resource and the Transaction ID participates in a transaction identified by the provided Transaction ID. In such cases, the Kafka producer instance is initialized in the first Kafka Send Message activity within the transaction group and the same producer instance is used by subsequent Kafka Send Message activities. The transaction gets committed or aborted as execution progresses toward the end of the transaction group.
Ensure that multiple Kafka Send Message activities participating in the same transaction should be configured with identical configuration. For example, if the first Kafka Send activity is using a string value serializer, then all subsequent activities should be configured with the string value serializer. However, partition or topic name can be provided differently for each activity.
General
On the General tab, you can specify the required parameters before you use this activity. The General tab contains the following fields:
Field | Literal Value/Module Property? | Description |
---|---|---|
Name | None | The name to be displayed as the label for the activity in the process. |
Kafka Connection | Yes | The Kafka connection resource provides the connection details to communicate with a Kafka server Instance. |
Topic Name | Yes | Provide the topic name where the Kafka cluster stores stream of records. |
Assign Custom Partition | None | This is a checkbox to select if the Partition ID field has to be entered. You can select the checkbox to override Kafka default partition assignment behavior. |
Partition ID | Yes | Sequence ID of the custom partition to which the message is sent. The default value is 0.
Note: This field is enabled only if the
Assign Custom Partition is checked.
|
Use Registry | No | This checkbox is available when
Avro Schema is selected in the
Key Serializer or
Value Serializer field.
This checkbox enables you to use the Avro Schema with the Schema Registry. Note: If a registry and any subject name strategy except the TopicName strategy is used, provide the strategy name through the
Additional properties.
|
Avro Encoding Type | Yes | This field is available when the Avro Schema is selected in the Key Serializer or Value Serializer field and the Use Registry checkbox is not enabled.
Three types of encoding are available in the dropdown:
For more information on the Avro Encoding types, please refer to Avro documentation. |
Key Serializer | No | This is a serializer class for the key that implements the serializer interface. |
Key Avro Schema Name | Yes | This field is available when
Avro Schema is selected in the
Key Serializer field and the
Use Registry checkbox is enabled.
Provide the name and version of the Avro schema to be used in the following format. <<subject-name>> : <<version>> |
Select Avro Schema | No | This button is available when the
Use Registry checkbox is enabled. This button selects the Avro Schema that is used for the serialization of the data.
On selecting the Select Avro Schema a wizard is displayed from where the user can select the stored schema and the version. From the wizard, you can view and select the Subject and Version of the Avro schema, the selected Avro schema, and the sample JSON payload for that schema. The Sample JSON payload can be used to create an XML Schema file using the BW Schemas tool XML Schema File from JSON Payload. This XSD file can be used to map and create the JSON string input. If you want to select the schema by using
Note: If the schema is selected by using
sha256Hash , the following format is used:
sha256hash : <<sha256hash value>> |
Sample Avro Data | No | This button is available when the
Use Registry checkbox is not enabled. On selecting the Sample Avro Data, a wizard is displayed. The wizard displays the Avro schema and sample JSON payload for that schema.
The sample JSON payload can be used to create an XML schema file using the BW schema tool, XML Schema File from JSON Payload. This XSD file is used to map and create the JSON string input. |
Value Serializer | No | Value for the serializer interface. |
Value Avro Schema Name | Yes | This field is available when
Avro Schema is selected in the
Value Serializer field and the
Use Registry checkbox is enabled. Provide the name and version of the Avro schema to be used in the following format.
<<subject-name>> : <<version>> |
Select Avro Schema | No | This button is available when the
Use Registry checkbox is enabled. This button selects the Avro Schema that is used for the serialization of the data.
On selecting the Select Avro Schema a wizard is displayed from where the user can select the stored schema and the version. From the wizard, you can view and select the Subject and Version of the Avro schema and the selected Avro schema and the sample JSON payload for that schema. The Sample JSON payload can be used to create an XML Schema file using the BW Schemas tool XML Schema File from JSON Payload. This XSD file can be used to map and create the JSON string input. If you want to select the schema by using
Note: If the schema is selected by using
sha256Hash , the following format is used:
sha256hash : <<sha256hash value>> |
Value Avro Schema File | Yes | This field is available when the Avro Schema is selected in the
Value Serializer field and the
Use Registry checkbox is not enabled.
Specifies the path to the .avsc file which contains the Avro schema to be used. Select the file through the resource picker. |
Sample Avro Data | No | This button is available when the
Use Registry checkbox is not enabled. On selecting the Sample Avro Data, a wizard is displayed. The wizard displays the Avro schema and sample JSON payload for that schema.
The sample JSON payload can be used to create XML schema file using the BW schema tool, XML Schema File from JSON Payload. This XSD file is used to map and create the JSON string input. |
Acks | Yes | When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0, 1, or all replicas. This controls the durability of records that are sent. This configuration controls the criteria under which producer requests are considered complete.
|
Buffer Memory | Yes | This is the total amount of memory available to the producer to buffer records waiting to be sent to the server.
The default value is 33554432. |
Compression Type | Yes | If enabled, data is compressed by the producer, written in compressed format on the server, and decompressed by the consumer. The default is none. |
Retries | Yes | Specifies the number of retries that are made by a client to resend any record in event of a transient error. |
Description
On the Description tab, you can enter a short description for the Send Message activity.
Field | Literal Value/Process Property/Module Property? | Description |
---|---|---|
Description | None | A description of the activity. |
Advanced
The following table describes the fields on the Advanced tab of the Send Message activity.
Field | Literal Value/ Module Property? | Description |
---|---|---|
Batch Size | None | Records are batched together by the producer whenever multiple records are sent to the same partition. No attempt is made to batch the records larger than this size.
The default value is 16384. |
Client ID | Yes | A String client ID is passed to the server while making the requests to track the source of the requests. |
Linger | Yes | You can set linger.ms to something greater than 0 to instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records arrive to fill up the same batch. The default behavior is to send messages immediately even if there is additional space in the buffer. |
Max Request Size | Yes | Limits the number of record batches the producer sends in a single request to avoid sending huge requests.
The default value is 1048576. |
Override Transaction Behavior | No | When this checkbox is selected, the Send Message activity does not participate in the transaction even if it is a part of the transaction group.
The default value is a boolean value. |
Transactional ID | Yes | This field is available only when the
Override Transaction Behavior checkbox is unselected.
It is a unique String ID that defines the Kafka transaction. For more information on transactions, please refer to the Kafka documentation. |
Enable Idempotence | No | This field enables exactly-once semantics for the Kafka producer. In the producer activity when the value is set to
true, exactly one copy of each of the messages is written in the stream. If the value is set to
false, the producer retries to connect because of broker failures and so on, which may write duplicate copies of the retried message in the stream.
The default value is false. |
Properties | Yes | You can set the name of the properties and value for the producer.
Note:
|
Interceptors | No |
The Interceptor class intercepts the sent message to modify or read it before publishing it to a topic or channel. You can add the interceptor classes using
Note: To add user-defined properties to the interceptor class, you must specify that property in the
Properties field.
|
Input
The following table describes the fields on the Input tab of the Send Message activity.
Field | Data Type | Description |
---|---|---|
ProducerConfig (All fields in this section are optional.) | ||
Topic | String | Topic name where Kafka cluster stores stream of records. |
Partition ID | Number | Sequence ID of the partition to which the message is sent. The Sequence ID of the partition can be entered here or in the
Partition ID field under the
General tab.
Note: The
Assign Custom Partition checkboxunder the
General tab must be selected to enter a value here.
|
Batch Size | Number | Batch size detail to batch the records sent to the same partition. |
Client ID | String | Client ID passed to the server while making the request. |
Linger.ms | Number | Time in milliseconds to add artificial delay while sending the message from the producer. |
Max Request Size | Number | Maximum size of a request in bytes. Use it to limit the number of record batches the producer sends in a single request to avoid sending huge requests. |
Additional properties | ||
|
|
|
schemaRegistryConfig | ||
KeySubjectNameVersion | String | Provide the name and version of the Avro schema to be used in the following format:
<<subject-name>> : <<version>> This field is available when Avro Schema is selected and the Use Registry checkbox is enabled. |
subjectNameVersion | String | Provide the name and version of the Avro schema to be used in the following format:
<<subject-name>> : <<version>> This field is available when Avro Schema is selected and the Use Registry checkboxis enabled. |
Messages | ||
|
String |
|
-Dcom.tibco.plugin.kafka.new.producer
JVM property to create a new producer for every job:
- If the property is not set or set to false, only one KafkaProducer is created in init() method when the application starts and the producer gets closed when the application stops.
- In the above case, the topic from the Input tab is overridden. All other producer properties are derived from the General and Advanced tabs and the Input tab configurations are ignored.
- If the property is set to true, then properties specified in the Input tab override the properties in the General tab and Advanced tab, and a new KafkaProducer is created for every job.
Output
The following table describes the fields on the Output tab of the Send Message activity.
Output Item | Data Type | Description |
---|---|---|
KafkaSendMessageOutput | complex | The complete output of the Send Message activity. |
result | complex | Information about the content of the sent and failed messages. |
status | string | Status of the message sent by a producer. |
SendSuccess | complex | Information about the content of the sent message. |
topic | string | The topic name for publishing the message. |
offset | number | The sequence ID number assigned to each record within a partition. |
Partition | number | The sequence ID of the partition to which a record is sent within a topic. |
SendFailed | complex | Information about the content of the failed message. |
errorCode | string | Displays the error code. |
errorMessage | string | Displays the error message. |
Fault
The Fault tab has the following exceptions:
- KafkaCreatedProducerException
- KafkaPluginException
Each exception has the following fields:
Field | Type | Description |
---|---|---|
msg | string | The error message description returned by the plug-in. |
msgCode | string | The error code returned by the plug-in. |