Contents
The TIBCO StreamBase® Output Adapter for Apache Kafka Producer allows StreamBase applications to connect to an Apache Kafka Broker and to send messages to the broker on specific topics.
Messages can be sent in various formats such as tuple, string, blob, or a custom format provided by the end user. The input tuple determines what the data type is for the message and key. The key for each message can also be specified as any format, just like the data.
A partition number can be provided to indicate into which partition this message should be stored, or it can be left blank for the Kafka broker to manage partition placement. When a message is delivered to the broker, an output tuple is sent from the producer with the actual partition and offset for that message. This occurs when the Enable Pass-through Fields option is enabled. The Kafka broker guarantees the message order.
The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
Property | Type | Description |
---|---|---|
Brokers | string | A comma-separated list of address:port brokers. |
Connect On Startup | check box | When enabled, the adapter attempts to connect to a Kafka broker on startup. If not enabled, you must manually send a connect command; if not sent, any published tuples fail until connected. |
Enable Command Port | check box | When enabled, the command port allows input tuples to control the adapter. For more information see Command Port. |
Enable Status Port | check box | When enabled, the adapter sends informational data on the status port about various adapter states. For more information see Status Port. |
Enable Metrics Port | check box | When enabled, the adapter allows output for the Kafka connection metrics when a metrics command is input. For more information see Metrics Port. |
Enable Pass-through Fields | check box | When enabled, the adapter outputs the data tuple when the message has been successfully published to the Kafka broker. |
Log Level | drop-down list | Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. |
Property | Type | Description |
---|---|---|
Serializer Class | string |
Specifies the class that implements the
There are a few values that will be passed to the
|
Key Serializer Class | string |
Specifies the class that implements the
The values passed the |
Compression Type | drop-down list | The compression type for all data generated by this producer instance. Compression consists of full batches of data, so the efficacy of batching also affects the compression ratio (more batching means better compression). Available values: None, GZip, Snappy. |
Client Id | string | The client ID to use when connecting to Kafka. |
Acks | integer or the string "all" |
The number of acknowledgments the producer requires the leader to have
received before considering a request complete. Allowed values are 0-n
integer value or the string all .
|
Buffer Memory | long | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. The default value is 33,554,432. |
Retries | int | An integer specifying the number of attempts to retry lost connections to the broker. Setting a value greater than zero causes the client to resend any record whose send fails with a potentially transient error. |
Batch Size | int | Specifies the number of bytes to use for message batches; the default value is 16384. The producer attempts to batch records together into fewer requests whenever multiple records are being sent to the same partition. |
Linger (ms) | long | Specifies the upper bound on the delay for batching, in number of milliseconds. The default value is zero. |
Max Request Size | int | Specifies the maximum size of a request in number of bytes. This is also effectively a cap on the maximum record size. The default value is 1,048,576. |
Receive Buffer Bytes | int | Specifies the size of the TCP receive buffer to use when reading data, in number of bytes. The default value is 32,768. |
Send Buffer Bytes | int | Specifies the size of the TCP send buffer to use when sending data, in number of bytes. The default value is 131,072. |
Timeout (ms) | int | Specifies the maximum amount of time in milliseconds that the server is to wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks property. The default value is 30,000 (30 seconds). |
Block On Buffer Full | check box | Specifies that to do when the Buffer Memory is exhausted. If this control is selected, this adapter stops accepting new records; if cleared, this adapter throw errors. |
Metadata Fetch Timeout (ms) | int | Specifies the maximum amount of time in milliseconds that the system blocks while waiting for a metadata fetch to succeed before throwing an exception back to the client. The default value is 60,000 (60 seconds). |
Metadata Max Timeout (ms) | int | Specifies the maximum amount of time in milliseconds we will block waiting for a metadata fetch to succeed before throwing an exception back to the client. The default value is 300,000 (5 hours). |
Reconnect Backoff (ms) | int | Specifies the amount of time in milliseconds to wait before attempting to reconnect to a given host when a connection fails. The default value is 10. |
Retry Backoff (ms | int | Specifies the amount of wait time before attempting to retry a failed producer request to a given topic partition. The default value is 100. |
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.
-
command, string, the command to send to the adapter.
The valid values are: connect, disconnect, metrics, updateBrokers
-
connect — Connect to the specified Kafka broker.
-
disconnect — Disconnect from the broker.
-
metrics — If this command port is enabled, the adapter produces a list of Kafka metrics associated with the current connection. Most values are null until at least one message is sent.
-
updateBrokers — Update the list of brokers to which this adapter will connect. This command updates only the broker list. You must disconnect and reconnect for the new broker list to take effect.
-
-
brokers (optional), list[tuple[string,int]] — a list of brokers to use with the updateBrokers command.
The data port is the default input port for the Kafka Producer adapter, and is always enabled. Use the data port to send message data to the connected Apache Broker.
The default schema for the data input port is:
-
topic, string. The topic for this message.
-
message, string, tuple, blob, or custom. The message content.
-
key (optional), string, tuple, blob, or custom. The key for this message.
-
partition (optional), int. The partition where this message is stored. If not supplied, the broker determines the partition.
You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.
-
type, string. The type of status information emitted. Status types are:
-
Error — This message relates to an error that occurred.
-
Warn — This message relates to a warning that the user should be aware of.
-
Info — This message relates to extra status information.
-
-
action, string. Valid values are:
-
Send — This message relates to an attempt to send a message to the broker.
-
Connected — The adapter has successfully connected to the broker.
-
Connecting — The adapter is currently trying to connect to the broker.
-
Disconnecting — The adapter is currently disconnecting from the broker.
-
Update Brokers — The broker list is being updated.
-
Command — This message relates to an input command sent on the command port.
-
-
object, string. This value may be null. If not null, it contains a value relevant to the status message.
-
message, string. This is a formatted human readable message that explains the status message.
-
time, timestamp. The time the status message occurred.
You can optionally enable the metrics output port for this adapter instance by
means of the Enable Metrics Port property in the
Adapter Properties page of the Properties view.
Use the metrics port to retrieve metrics output messages sent from the adapter in
response to a metrics
command sent to its command
port. A set of perhaps 40 or 50 metrics messages are sent in response to this
command. Most metrics values are null until at least one message has been sent
through the currently connected broker.
When you select Custom for either the message or key serialization, you must provide
a class that implements the org.apache.kafka.common.serialization.Serializer
interface. For
example:
package com.streambase.sb.adapter.kafka; import java.util.EnumSet; import java.util.Map; import org.slf4j.Logger; import com.streambase.sb.Schema; import com.streambase.sb.Tuple; import com.streambase.sb.TupleJSONUtil; import com.streambase.sb.TupleJSONUtil.Options; public class DemoSerializer implements org.apache.kafka.common .serialization.Serializer<Tuple> { private static final String CONFIG_SCHEMA = "schema"; private static final String CONFIG_LOGGER = "logger"; private Schema schema; private Logger logger; @Override public void close() { } @Override public void configure(Map<String, ?> configs, boolean isKey) { if (configs.containsKey(CONFIG_SCHEMA)) { schema = (Schema)configs.get(CONFIG_SCHEMA); } if (configs.containsKey(CONFIG_LOGGER)) { logger = (Logger) configs.get(CONFIG_LOGGER); } } @Override public byte[] serialize(String topic, Tuple tuple) { EnumSet<Options> options = EnumSet.of(Options.INCLUDE_NULL_FIELDS, Options .INCLUDE_NULL_FIELDS, Options.PREFER_MAP); String jsonTuple = TupleJSONUtil.toJSONString(tuple, options, ""); return jsonTuple.getBytes(); } }