Contents
The TIBCO StreamBase® Input Adapter for Apache Kafka Consumer allows the system to consume data from an Apache Kafka broker.
Each message from the broker contains the topic that the message was sent to, as well as the message, key, offset, and partition. When subscribing to a topic, you can request to start from a specific offset value, or give a timestamp value which the system determines from the closest offset to the given time and produce messages from that point forward.
When requesting offsets by timestamp, there are two values that have meaning:
-
-1 Means no history, and only start receiving messages from this point forward.
-
-2 Means all history, receive all data from history and then all messages going forward.
The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
Property | Type | Description |
---|---|---|
Brokers | string |
A comma-separated list of address:port Kafka brokers. Default value is
localhost:9092
|
Enable Command Port | check box | When enabled, the command port allows input tuples to control the adapter. For more information see Command Port. The default state is cleared. |
Enable Status Port | check box | When enabled, the adapter sends informational data on the status port about various states of the adapter. For more information see Status Port. The default state is cleared. |
Log Level | drop-down list | Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. |
Property | Type | Description |
---|---|---|
Deserializer Type | drop-down list |
Specifies the type of deserializer to use when converting message data.
Valid values are: String, Tuple, Blob, and Custom. Use Custom to supply
your own Java class that implements org.apache.kafka.common.serialization.Deserializer .
|
Deserializer Class | string |
The Java class that implements org.apache.kafka.common.serialization.Deserializer , used
when specifying Custom in the previous row.
There are a few values that are passed to the
|
Key Deserializer Type | drop-down list |
Specified the deserializer type to use when converting key data. Valid
values are: String, Tuple, Blob, and Custom. Use Custom to supply your own
Java class that implements org.apache.kafka.common.serialization.Deserializer .
|
Key Deserializer Class | string |
The Java class that implements org.apache.kafka.common.serialization.Deserializer , used
when specifying Custom in the previous row.
There are a few values that are passed to the |
Use Default Character Set | check box | Select this control to specify the use of the Java platform default character set. You can also leave the control clear and specify the Character Set property in the next row. The default state is selected. |
Character Set | string |
When the Use Default Character Set property
is clear, this control specifies the character set to use when converting
strings. The default value is UTF-8.
|
Default Buffer Size | int | Specifies the default buffer size in bytes to use when fetching data for a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 65536. |
Default Fetch Size | int | Specifies the default fetch size in bytes to use when fetching records from a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 100,000. |
Default Timeout | int | The default timeout in milliseconds to use when fetching records from a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 100,000 (100 minutes). |
Max Failure Count | int | The maximum number of connection failures to report before removing a subscription to a topic. The default value is 5. |
Property | Type | Description |
---|---|---|
Output Schema | schema | The schema used to convert Kafka messages into tuples. |
Key Output Schema | schema | The schema used to convert Kafka message keys into tuples. |
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.
-
command, string, the command to send to the adapter. Valid values are:
-
subscribe — Subscribe to a topic.
-
unsubscribe — Unsubscribe from a topic.
-
pause — Causes the adapter to stop reading data from all currently subscribed topics.
-
resume — If the adapter is currently paused, resumes reading from the already subscribed-to list of topics.
-
updateBrokers — Update the broker list to which this adapter connects. This command only updates the broker list. You must unsubscribe and resubscribe to the topics for the new broker list to take effect.
-
-
topic, string. The topic from which to subscribe and unsubscribe.
-
partition (optional), int. The partition to subscribe to, leave null for default of 0.
-
time (optional), long. The timestamp long value to start the subscription from. -1 means start from latest, -2 means to start from beginning. If this value is null a value of -1 is assumed.
-
offset (optional), long. The exact offset for the message to read when subscribing. This value overrides any time value supplied.
-
bufferSize (optional), int. Overrides the default buffer size for the topic addressed in this command.
-
fetchSize (optional), int. Overrides the default fetch size for the topic addressed in this command..
-
timeout (optional), int. Overrides the default timeout for the topic addressed in this command..
-
clientId (optional), string. The client ID to send with the subscription.
-
brokers (optional), list[tuple[string,int]] List of brokers to use with the updateBrokers command.
The data port is the default output port for the Kafka Consumer adapter, and is always enabled. Use the data port to receive message data from the connected Apache Broker.
The default schema for the data output port is:
-
topic, string. The topic for this message.
-
offest, long. The offset value where this message was stored.
-
message, string. The content of this message.
-
partition, int. The partition where this message was stored.
-
key, string, tuple, blob, or custom. The key for this message.
You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.
-
type, string. The type of status information emitted. Status types are:
-
Error — This message relates to an error that occurred.
-
Warn — This message relates to a warning that the user should be aware of.
-
Info — This message relates to extra status information.
-
-
action, string. Valid values are:
-
Send — This message relates to an attempt to send a message to the broker.
-
Connected — The adapter has successfully connected to the broker.
-
Connecting — The adapter is currently trying to connect to the broker.
-
Connection — Information about the current connection.
-
Disconnecting — The adapter is currently disconnecting from the broker.
-
Update Brokers — The broker list is being updated.
-
Command — This message relates to an input command sent on the command port.
-
-
object, string. This value may be null. If not null, it contains a value relevant to the status message.
-
message, string. This is a formatted human readable message that explains the status message.
-
time, timestamp. The time the status message occurred.
When a class is supplied for either message or key serialization, you must provide a
class that implements the org.apache.kafka.common.serialization.Deserializer
interface. The
following is an example of such a class:
package com.streambase.sb.adapter.kafka; import java.io.UnsupportedEncodingException; import java.util.Map; import org.slf4j.Logger; import com.streambase.sb.Schema; import com.streambase.sb.StreamBaseException; import com.streambase.sb.Tuple; import com.streambase.sb.TupleJSONUtil; public class DemoDeserializer implements org.apache.kafka.common .serialization.Deserializer<Tuple> { private static final String CONFIG_SCHEMA = "schema"; private static final String CONFIG_CHARACTER_SET = "characterSet"; private static final String CONFIG_USE_DEFAULT_CHARACTER_SET = "useDefaultCharacterSet"; private static final String CONFIG_LOGGER = "logger"; private Schema schema; private String characterSet; private boolean useDefaultCharacterSet = true; private Logger logger; @Override public void configure(Map<String, ?> configs, boolean isKey) { schema = (Schema) configs.get(CONFIG_SCHEMA); characterSet = (String) configs.get(CONFIG_CHARACTER_SET); useDefaultCharacterSet = (Boolean) configs.get(CONFIG_USE_DEFAULT_CHARACTER_SET); logger = (Logger) configs.get(CONFIG_LOGGER); } @Override public Tuple deserialize(String topic, byte[] data) { Tuple tuple = schema.createTuple(); try { String tupleJSON = useDefaultCharacterSet ? new String(data) : new String(data, characterSet); logger.info("Deserializing tuple for topic '" + topic + "' from string: " + tupleJSON); TupleJSONUtil.setTupleFromJSONLoose(tuple, tupleJSON, ""); } catch (UnsupportedEncodingException | StreamBaseException e) { logger.error("Error deserializing topic '" + topic + "': " + e.getMessage(), e); } return tuple; } @Override public void close() { } }