Contents
The TIBCO StreamBase® Input Adapter for Apache Kafka Consumer allows the system to consume data from an Apache Kafka broker.
Each message from the broker contains the topic that the message was sent to, as well as the message, key, offset, and partition. When subscribing to a topic, you can request to start from a specific offset value, or give a timestamp value which the system determines from the closest offset to the given time and produce messages from that point forward.
When requesting offsets by timestamp, there are two values that have meaning:
-
-1 Means no history, and only start receiving messages from this point forward.
-
-2 Means all history, receive all data from history and then all messages going forward.
The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
Property | Type | Description |
---|---|---|
Brokers | string | A comma-separated list of address:port[config=value|config=value] Kafka brokers. Default value is localhost:9092 This value can also be a Azure Event Hubs connection string. The config=value section of the brokers list allows you to specify
advance configuration directly in the broker list. For example if you required a security.protocol and security.mechanism
you can specify a broker list like: test.com:9093[security.protocol=SASL_SSL,sasl.mechanism=PLAIN],test2.com:9093 |
Enable Command Port | check box | When enabled, the command port allows input tuples to control the adapter. For more information see Command Port. The default state is cleared. |
Enable Status Port | check box | When enabled, the adapter sends informational data on the status port about various states of the adapter. For more information see Status Port. The default state is cleared. |
Enable Passthrough Fields | check box | When enabled, the adapter outputs the same tuple that started the consumer operation for each message that comes from the broker for the subscribed topic. |
Output Latest Offset On Subscribe | check box | When enabled, the adapter will output a status message with the latest offset for the subscriptions topic and partition in
the object field of the status message and the action will be Offset . If subscribing without specifying a partition you may received multiple last offset messages, one for each partition that
is assigned.
|
Output Beginning Offset On Subscribe | check box | When enabled, the adapter will output a status message with the beginning offset for the subscriptions topic and partition
in the object field of the status message and the action will be Offset . If subscribing without specifying a partition you may received multiple beginning offset messages, one for each partition
that is assigned.
|
Enable Metrics Port | check box | When enabled, the adapter allows output for the Kafka connection metrics when a metrics command is input. For more information see Metrics Port. |
Output High Water Mark | check box | When enabled, the adapter will output the current high water mark value with each data tuple. |
Log Level | INFO | Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. |
Property | Type | Description |
---|---|---|
Perform Commit | check box | If enabled the adapter for perform a commit call for each batch of messages received. |
Poll Wait MS | int | The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. This value must not be negative. |
Deserializer Type | drop-down list | Specifies the type of Deserializer to use when converting message data. Valid values are: Avro, Blob, String, Tuple, and Custom.
Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer .
|
Deserializer Class | string | The Java class that implements org.apache.kafka.common.serialization.Deserializer , used when specifying Custom in the previous row.
There are a few values that are passed to the
|
Deserializer Properties | string key:value |
A set of key value properties that will be passed into the custom Deserializer by the configuration method. |
Key Deserializer Type | drop-down list | Specified the Deserializer type to use when converting key data. Valid values are: Avro, Blob, String, Tuple, and Custom.
Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer .
|
Key Deserializer Class | string | The Java class that implements org.apache.kafka.common.serialization.Deserializer , used when specifying Custom in the previous row.
There are a few values that are passed to the |
Key Deserializer Properties | string key:value |
A set of key value properties that is passed into the custom key Deserializer by the configuration method. |
Use Default Character Set | check box | Select this control to specify the use of the Java platform default character set. You can also leave the control clear and specify the Character Set property in the next row. The default state is selected. |
Character Set | string | When the Use Default Character Set property is clear, this control specifies the character set to use when converting strings. The default value is UTF-8. |
Max Failure Count | integer | The maximum number of connection failures to report before removing a subscription to a topic. The default value is 5. |
Advanced Config | String Array | This key value set of strings allows you to configure any Kafka Consumer Property. NOTE: ClientId of the input tuple if not null will override any client Id entry supplied. |
Property | Type | Description |
---|---|---|
Output Schema | schema | The schema used to convert Kafka messages into tuples. |
Key Output Schema | schema | The schema used to convert Kafka message keys into tuples. |
Use the settings in this tab to allow this operator or adapter to start and stop based on conditions that occur at runtime in a cluster with more than one node. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with TIBCO Streaming releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.
-
command, string. The command to send to the adapter. Valid values are:
-
subscribe — Subscribe to a topic and partition at a specific offset or time given by the input tuple.
-
unsubscribe — Unsubscribe from a topic and partition.
-
pause — Suspend fetching from the requested partitions. If the input tuple topic and partition are null then all current subscriptions are suspended. If not null, then only the supplied topic and partition are suspended.
-
resume — Resume requested partitions which have been paused with pause. If the input tuple topic and partition are null then all current subscriptions are resumed, if not null then only the supplied topic and partition are resumed.
-
updateBrokers — Update the broker list to which this adapter connects. This command only updates the broker list. You must resubscribe to the topics for the new broker list to take effect.
-
metrics — If this command port is enabled, the adapter produces a list of Kafka metrics associated with the current consumer.
-
-
topic, string. The topic from which to subscribe or unsubscribe. This value is also used optionally for pause and resume.
-
pattern, string. The pattern can be used instead of topic to subscribe to topics based on a regular expression. The topic and partition fields are ignored if this field is not empty.
-
partition (optional), integer. The partition to subscribe to or unsubscribe from. If this value is null, a consumer group is created and this subscribed is assigned one or more partitions by the server. This value is also used optionally for pause and resume.
-
time (optional), long. The timestamp long value to start the subscription from. -1 means start from latest, -2 means to start from beginning. If this value is null and offset is also null then no seek operation is performed. If offset is not null and greater than or equal to zero then this value is ignored. If both time and offset are missing or null then the latest offset is assumed.
-
offset (optional), long. The exact offset to start from when subscribing. If not null this value overrides any time value supplied. If this value is null and time is also null then no seek operation is performed.
-
clientId (optional), string. The client ID to send with the subscription.
-
brokers (optional), list[tuple[string host,int port]]. List of brokers to use with the updateBrokers command.
-
advancedConfig (optional), list[tuple[string key,string value]]. A list of consumer configuration properties to set per subscription. The order of configuration settings are first the default, second the advance configuration from the properties overwrite any defaults, and third the values from the input tuple overwrite any existing values.
The data port is the default output port for the Kafka Consumer adapter, and is always enabled. Use the data port to receive message data from the connected Apache Broker.
The default schema for the data output port is:
-
topic, string. The topic for this message.
-
offset, long. The offset value where this message was stored.
-
message, string, tuple, blob, or custom. The content of this message.
-
partition, integer. The partition where this message was stored.
-
consumerName, string. The fully qualified name of the consumer that consumed this message. This name can be used to optional pass into the Consumer Commit adapter to commit specific messages.
-
key, string, tuple, blob, or custom. The key for this message.
-
passThrough, tuple. If pass through fields are enabled, this field is the control tuple that was input to start the subscription to this topic.
You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.
-
type, string. The type of status information emitted. Status types are:
-
Error — This message relates to an error that occurred.
-
Warn — This message relates to a warning that the user should be aware of.
-
Info — This message relates to extra status information.
-
-
action, string. Valid values are:
-
Subscribe — This message relates to subscribing to a topic and partition. If the level is INFO, it will contain a subscription-successful message. If it is a WARN message, it will contain information as to why the subscription may have not performed successfully. If no partition is given during the subscribe command, this status will be given for the overall topic as well as for the initial assignment of topics given by the server.
-
Unsubscribe — This message relates to unsubscribing from a topic and partition. If the level is INFO, it will contain a unsubscribe successful message. If it is a WARN message, it will contain information as to why unsubscribing was not successful.
-
Rebalance Assinged — This message relates to partition assignments given by the server. After the initial assignment any change in partitions will produce assignment status messages and will inform of an assigned partition.
-
Rebalance Revoked — This message relates to partition assignments given by the server. After the initial assignment any change in partitions will produce assignment status messages and will inform of a revoked partition.
-
UpdateBrokers — This message related to updating brokers. If the level is WARN the message will contain information as to why this action may have not been performed correctly.
-
Command — This message relates to an input command sent on the command port.
-
Convert — This message relates to errors that occur converting messages into the StreamBase tuple schema.
-
Fetch — This message contains information when trying to find a topic and partitions offset
-
Process — This message contains information when trying to process a message for a subscription.
-
Pause — This message contains information when pausing a subscription.
-
Resume — This message contains information when resuming a subscription.
-
Offset — This message contains the current offset of a topic and partition when a subscription happens and the option
Output Latest Offset On Subscribe
is enabled. -
HighWaterMark — This message contains the current high water mark level in the object field and is sent when the current read offset requested is less than the current high water mark for a topic and partition.
-
-
object, string. This value may be null. If not null, it contains a value relevant to the status message.
-
message, string. This is a formatted human readable message that explains the status message.
-
time, timestamp. The time the status message occurred.
You can optionally enable the metrics output port for this adapter instance by means of the Enable Metrics Port property in the Adapter Properties page of the Properties view. Use the metrics port to retrieve metrics output messages sent from the adapter in response to a metrics command sent to its command port. A set of perhaps 40 or 50 metrics messages are sent in response to this command.
When a class is supplied for either message or key serialization, you must provide a class that implements the org.apache.kafka.common.serialization.Deserializer
interface. The following is an example of such a class:
package com.streambase.sb.adapter.kafka; import java.io.UnsupportedEncodingException; import java.util.Map; import org.slf4j.Logger; import com.streambase.sb.Schema; import com.streambase.sb.StreamBaseException; import com.streambase.sb.Tuple; import com.streambase.sb.TupleJSONUtil; public class DemoDeserializer implements org.apache.kafka.common .serialization.Deserializer<Tuple> { private static final String CONFIG_SCHEMA = "schema"; private static final String CONFIG_CHARACTER_SET = "characterSet"; private static final String CONFIG_USE_DEFAULT_CHARACTER_SET = "useDefaultCharacterSet"; private static final String CONFIG_LOGGER = "logger"; private Schema schema; private String characterSet; private boolean useDefaultCharacterSet = true; private Logger logger; @Override public void configure(Map<String, ?> configs, boolean isKey) { schema = (Schema) configs.get(CONFIG_SCHEMA); characterSet = (String) configs.get(CONFIG_CHARACTER_SET); useDefaultCharacterSet = (Boolean) configs.get(CONFIG_USE_DEFAULT_CHARACTER_SET); logger = (Logger) configs.get(CONFIG_LOGGER); } @Override public Tuple deserialize(String topic, byte[] data) { Tuple tuple = schema.createTuple(); try { String tupleJSON = useDefaultCharacterSet ? new String(data) : new String(data, characterSet); logger.info("Deserializing tuple for topic '" + topic + "' from string: " + tupleJSON); TupleJSONUtil.setTupleFromJSONLoose(tuple, tupleJSON, ""); } catch (UnsupportedEncodingException | StreamBaseException e) { logger.error("Error deserializing topic '" + topic + "': " + e.getMessage(), e); } return tuple; } @Override public void close() { } }