Kafka and the Kafka Streams Channel

To send and receive messages from a Kafka broker, use the TIBCO BusinessEvents Kafka channel. The Kafka channel converts the incoming Kafka messages to BusinessEvents events and transforms BusinessEvents events as outgoing Kafka messages.

Also, you can process input Kafka Streams by using the Kafka Streams channel. The Kafka Streams channel destination can be configured to use stream processors to create stream processing topologies. After the stream topology processes the incoming stream, a stream record is converted to a SimpleEvent that triggers the relevant rules.
Note: Kafka Streams is a in-only channel that is it cannot be used to send messages. You can use the Kafka channel/destination to send messages to Kafka.

Before You Begin

Before setting up the Kafka or the Kafka Streams channel, go through the Kafka documentation for information (concepts, architecture, demos, APIs, and so on) about them. The following table lists the URLs that you can see for information about Kafka and Kafka Streams.

Channel Type Documentation URL
Kafka Kafka Documentation
Kafka Streams Kafka Streams Documentation

Kafka Channel Serializers

The Kafka channel provides the following serializers to handle payloads:

KafkaMapSerializer
The KafkaMapSerializer serializer (com.tibco.cep.driver.kafka.serializer.KafkaMapSerializer) serializes and deserializes a BusinessEvents event along with its payload into (or from) HashMap. The KafkaMapSerializer is used to send and receive events between BusinessEvents instances. For incoming messages the serializer converts the bytes sequence to event and its payload. For outgoing events the serializer converts the event and its payloads into HashMap as bytes.
KafkaJsonSerializer
The KafkaJsonSerializer serializer (com.tibco.cep.driver.kafka.serializer.KafkaJsonSerializer) serializes and deserializes a BusinessEvents event along with its payload into (or from) JSON. For incoming messages, the KafkaXmlSerializer decodes the text from the message as an JSON string and deserializes it to an event. For outgoing events, the serializer converts the event and its payloads into an JSON string. The KafkaXmlSerializer serializer is useful for processing or sending messages between BusinessEvents and external systems.
KafkaXmlSerializer
The KafkaXmlSerializer serializer (com.tibco.cep.driver.kafka.serializer.KafkaXmlSerializer) serializes and deserializes a BusinessEvents event along with its payload into (or from) a JSON string that does not have any qualifiers. For incoming messages, the KafkaXmlSerializer decodes the text from the message as an XML string and deserializes it to an event. For outgoing events, the serializer converts the event and its payloads into an XML string. The KafkaXmlSerializer serializer is useful for processing or sending messages between BusinessEvents and external systems.

KafkaTextPayloadSerializer
The KafkaTextPayloadSerializer serializer (com.tibco.cep.driver.kafka.serializer.KafkaTextPayloadSerializer) serializes and deserializes a BusinessEvents event along with its payload into (or from) text ignoring event properties. This seriliazer ignores the event properties even if they are defined. For incoming messages, the KafkaTextPayloadSerializer reads the text from the message and deserializes it to an event. For outgoing events, the serializer converts the event payload into a Kafka message value.
Note: If you are migrating a project from earlier version which contains KafkaStringPayloadSerializer then the project still works in this version. However, the KafkaStringPayloadSerializer serializer is not available for a new Kafka channel.

Kafka Streams Channel Serializers

The Kafka Streams channel provides all the serializers that are present in the Kafka channel plus the KeyValueSerializer.

KeyValueSerializer
KeyValueSerializer (com.tibco.cep.driver.kafkastreams.serializer.KeyValueSerializer) deserializes a Kafka Streams record (with the primitive key and value types) into a SimpleEvent (set as Default Event). The event must have two properties with names RECORD_KEY and RECORD_VALUE with the data type matching the key and value of the stream record. For example, output of the Count transformation can be deserialized into an event with RECORD_KEY as String and RECORD_VALUE as Long. The KeyValueSerializer is used to receive events in the TIBCO BusinessEvents instances.

Adding a New Channel

See the following topics for more information about new Kafka and Kafka Streams channels:

Sample TIBCO BusinessEvents Applications

The following sample TIBCO BusinessEvents applications are provided with TIBCO BusinessEvents for Kafka and Kafka Streams channels to help you understand these channels better:

Channel Type Sample TIBCO BusinessEvents Application
Kafka BE_HOME/examples/standard/KafkaChannel
Kafka Streams BE_HOME/examples/standard/KafkaStreamsChannel

Catalog Functions

You can send and receive events with payload in Kafka channel using Event.sendEvent(), and Event.routeTo(). Currently, Kafka does not support synchronous request-reply of messages; thus, the functions Event.requestEvent() and Event.replyEvent() are not supported.

Kafka and Kafka Streams Properties

You can specify Kafka and Kafka Streams properties in the CDD file by adding appropriate prefixes to the channel. By using the same prefix, you can also override an existing Kafka and Kafka Streams property, which has been exposed through the TIBCO BusinessEvents Studio channel editor. For example, for specifying the default.replication.factor Kafka property, use be.channel.kafka.default.replication.factor.
Channel Type Prefix Description
Kafka be.channel.kafkaUsed for Kafka properties that are applicable for the Kafka channel and all its destinations.
be.channel.kafka.<destination_name>Used for Kafka properties that are applicable for only the specified destination of the Kafka channel.
Kafka Streams be.channel.kafka.streamsUsed for Kafka Streams properties that are applicable for the Kafka Streams channel and all its destinations.
be.channel.kafka.streams.<destination_name>Used for Kafka Streams properties that are applicable for the specified destination of the Kafka Streams channel.