Apache Flume External Adapters

Two Flume Adapters

The TIBCO StreamBase© Adapters for Apache Flume allow StreamBase to integrate with Apache Flume as either a Flume sink or source. The sink and source run inside the Apache Flume agent process. This means that the process is not an external process; rather, it is loaded as a Flume agent in the context of the Flume system.

StreamBase provides two Flume-related adapters:

  • Flume External Input Adapter

  • Flume External Output Adapter

The Flume adapter installation kit includes a sample application. The sample appears in StreamBase Studio's Load StreamBase Sample dialog only after you install the Flume External Adapter kit.

It is possible to install the Flume adapter kit on machines that do not have StreamBase installed. On UNIX platforms, you must also install the StreamBase client JAR file, as described below in the Configuration section.

The adapters are also certified by Cloudera Inc. for use with the Flume services of their CDH 5 product.

Introduction to the Flume Adapters

The Flume Input adapter acts as a Flume sink that consumes Flume events and enqueues them to StreamBase Server. Conversely, the Output adapter acts as a Flume source which dequeues messages from StreamBase Server and emits corresponding Flume events.

Because the format of Flume events has no predefined format, flexibility was built into the adapter to convert Flume events to and from StreamBase tuples using a plug-in model to load the code responsible for the conversion process.

The adapters support Apache Flume versions 1.1.0 and above.

Connection To StreamBase Server

The Input adapter runs as a Flume sink node. This means it connects to a Flume channel and receives Flume events. As it receives such events it converts them to tuples and sends them to a StreamBase Server. Conversely, the Output adapter runs as a Flume source node, receiving tuples from the StreamBase Server and emitting Flume events on the configured Flume channel.

The Flume channel to which the adapter connects is determined by the settings in the Flume configuration file specified by the -f switch when invoking flume-ng. Please refer to the Flume documentation for details on the format of this file.

The StreamBase Server and stream to which the adapter connects as well as other adapter-specific settings are determined by the following parameters, also specified in the Flume configuration file. The supported parameters are:

Parameter Name Required? Expected Format Description
sbserver-uri Yes, unless sbserver-hostname and sbserver-port are specified Valid StreamBase URI The URI describing the StreamBase Server to which to connect.
sbserver-hostname Only if sbserver-uri is NOT specified Valid host name or IP address The machine hosting the StreamBase Server to which to connect.
sbserver-port Only if sbserver-uri is NOT specified Valid IP port number The port on which StreamBase Server is listening.
stream-name Yes String The name of the stream on which StreamBase Server is expecting incoming tuples (when using the Input adapter) or on which StreamBase Server will be enqueuing tuples (for the Output adapter).
converter No Fully-qualified Java class name The name of the class responsible for converting between Flume events and StreamBase tuples. For the Input adapter (Flume sink) this class is expected to implement the com.streambase.sb.adapter.flume.StreamBaseSink.Converter interface; for the Output adapter (Flume source) the class is expected to implement the com.streambase.sb.adapter.flume.StreamBaseSource.Converter interface. (See below for more information on these interfaces and how to implement them.) If this setting is omitted the default converter is used; this converter assumes the payload of the Flume event is a simple UTF-8 string.
buffer-size No Positive integer (Output adapter only; ignored for Input adapters.) Specifies the number of tuples to buffer before enqueuing them to the stream. This is useful when dealing with high traffic scenarios to reduce the number of network round trips and to maximize throughput on the StreamBase side.
flush-interval No Positive integer (Output adapter only; ignored for Input adapters. Also ignored if buffer-size has not been specified.) Sets the maximum number of milliseconds to wait before enqueuing buffered tuples.

Additionally, custom parameters can be specified in the Flume configuration file for use by the StreamBaseSink.Converter or StreamBaseSource.Converter implementer. These parameters are passed along in the form of a Flume context object (org.apache.flume.Context). See Event Translation below for more information.

Connection Logic

When a Flume node is configured to load a StreamBase adapter as the sink or source, once it is launched it tries to connect to the configured StreamBase Server and subscribe to the configured stream therein. If the connection attempt fails for any reason, another attempt is made after a short timeout period. After a successful connection is established, if it is lost for any reason, the adapter again tries to reconnect until successful. Flume events received by the Input adapter while the connection to StreamBase Server is down are dropped, and the Flume system progressively throttles the incoming flow of events until a connection to StreamBase Server can be re-established.

Event Translation

The Input adapter must convert incoming Flume events to StreamBase tuples, and the Output adapter must perform the opposite conversion, tuples to Flume events. Since the body of Flume events carry a payload of arbitrary format, the adapter must provide total flexibility in the conversion mechanism to handle this payload. To this end, the adapters use a plug-in mechanism to perform custom conversions. For example, you can provide a custom converter to consume Flume events produced by an Apache Avro Flume source (or on the Output side to produce such events destined to an Avro Flume sink).

Custom converters for use by the Input adapter are expected to implement the com.streambase.sb.adapter.flume.StreamBaseSink.Converter interface. The definition of this interface is as follows:

/**
 * {@link StreamBaseSink} uses pluggable converters which must implement this interface.
 * An instance of the converter will be created using its default public constructor,
 * then the {@link #init(Context, Schema)} method will be called to perform initialization
 * of the instance. This call is guaranteed to occur before the first call to {@link #convert(Tuple, Event)}.
 */
public static interface Converter
{
  /**
   * This method can be used to initialize the object. It is guaranteed to be called
   * before the first call to {@link #convert(Tuple, Event)}.
   * @param context the flume {@link Context} used by this source. This can be used
   * by the converter instance to access custom parameters if desired.
   * @param schema the StreamBase {@link Schema} of the stream used by this sink.
   * This can be useful to perform schema validation, and to implement field-caching
   * strategies for more efficient conversions. 
   * @throws StreamBaseException
   */
  public void init(Context context, Schema schema) throws StreamBaseException;
        
  /**
   * Converts the Flume {@link Event} to a StreamBase {@link Tuple}.
   * @param event the Flume event to convert.
   * @param tuple the StreamBase tuple into which to place the event's contents.
   * @return true if the conversion was successful, false otherwise.
   * @throws StreamBaseException
   */
  public boolean convert(Event event, Tuple tuple) throws StreamBaseException;       
}

Similarly, custom converters for use by the Output adapter are expected to implement the com.streambase.sb.adapter.flume.StreamBaseSource.Converter interface. The definition of this interface is as follows:

/**
 * {@link StreamBaseSource} uses pluggable converters which must implement this interface.
 * An instance of the converter will be created using its default public constructor,
 * then the {@link #init(Context, Schema)} method will be called to perform initialization
 * of the instance. This call is guaranteed to occur before the first call to {@link #convert(Tuple, Event)}.
 */
public static interface Converter
{
  /**
   * This method can be used to initialize the object. It is guaranteed to be called
   * before the first call to {@link #convert(Tuple, Event)}.
   * @param context the flume {@link Context} used by this source. This can be used
   * by the converter instance to access custom parameters if desired.
   * @param schema the StreamBase {@link Schema} of the stream used by this source.
   * This can be useful to perform schema validation, and to implement field-caching
   * strategies for more efficient conversions. 
   * @throws StreamBaseException
   */
  public void init(Context context, Schema schema) throws StreamBaseException;
        
  /**
   * Converts the StreamBase {@link Tuple} to a Flume {@link Event}.
   * @param tuple the StreamBase tuple to convert.
   * @param event the Flume event into which to place the tuple's contents.
   * @return true if the conversion was successful, false otherwise.
   * @throws StreamBaseException
   */
  public boolean convert(Tuple tuple, Event event) throws StreamBaseException;       
}

If no custom converter is specified in the Flume configuration file, the adapter assumes the payload of the Flume event is a simple byte buffer representing a UTF-8 string which is to be mapped to a tuple field of type String named body or Body.

The adapter also ships with another ready-to-use set of sink and source converters (com.streambase.sb.adapter.flume.ByteArraySinkConverter and com.streambase.sb.adapter.flume.ByteArraySourceConverter, respectively), which treat the Flume event payload as an opaque byte array (mapped to a StreamBase field of type Blob named body or Body).

Flume Event Headers

For both the Input and Output adapters, the Converter interface implementer is responsible for the proper conversion of any Flume event headers as well as the body. One suggestion is to represent Flume event headers as a list of tuples in the StreamBase tuple, with the nested tuple containing a pair of string fields, one for the header's entry name and one for its value. This is how both the default converters and the ByteArray variants described above treat headers.

As a convenience to custom converter implementers a set of abstract classes (com.streambase.sb.adapter.flume.HeaderAwareSourceConverter and com.streambase.sb.adapter.flume.HeaderAwareSinkConverter) is provided, which implement support for headers in the manner suggested above. It is highly recommended to subclass from these "header-aware" converter classes when implementing your own converters. Note that both the String and Byte Array converters shipping with the adapters employ this mechanism.

Specifically, HeaderAware{Sink,Source}Converter looks for a field named either header or Header in the Schema passed to the converter's init() method. If such a field is found, the converter expects it to be of type List of Tuples. Also the nested tuple's Schema is expected to contain two string fields, named key (or Key) and value (or Value, or val or Val). If all those conditions are respected, this list is populated with the Flume event's header entries (for the Input adapter) or its contents are used to populate the outgoing Flume event (for the Output adapter).

Converters that subclass HeaderAwareSourceConverter or HeaderAwareSinkConverter must observe the following rules:

  • In the call to init(), make a call to super.init() to allow the superclass to initialize and examine the schema for the presence of the header-specific fields, as well cache them for better performance.

  • In the call to convert(), make a call to super.convert() to allow the superclass to perform the conversion of the headers.

Using the Flume Adapter

Installation

The StreamBase Flume adapter needs two JAR files to function properly: $STREAMBASE_HOME/lib/sbflume.jar and $STREAMBASE_HOME/lib/sbclient.jar. Once you have completed installation of the StreamBase Flume adapter kit, the most expedient way to make the classes therein available to Flume is to copy these JAR files to flume_installdir/lib in Flume 1.1 or to a subdirectory of $FLUME_HOME/plugins.d/ as described in the section "Directory layout for plugins" in the Apache Flume User Guide:

$FLUME_HOME/plugins.d/streambase-flume/lib/sbflume.jar

$FLUME_HOME/plugins.d/streambase-flume/lib/ext/sbclient.jar

Installation in Cloudera CDH

If you plan on using the adapter within a Cloudera CDH cluster, the best way way to deploy is to place both JAR files together in a directory accessible to the cluster and configure the Flume service to load the files by using the Cloudera Manager's Configuration tab for the Flume service, by adding this directory to the "Plugin Directories" setting in the Agent Base Group category.

Configuration

Configuration of the adapter (source or sink) is accomplished by means of the Flume configuration file passed to the flume-ng program. Here is an example of such a configuration file, which defines StreamBase source feeding a Logger sink:

# Define a memory channel called ch1 on host1
host1.channels.ch1.type = memory

# Define a StreamBase source called sb-source1 on host1 and tell it
# to connect to the StreamBase server at localhost:10000.
# On the Flume side, connect the source channel ch1.
host1.sources.sb-source1.type = com.streambase.sb.adapter.flume.StreamBaseSource
host1.sources.sb-source1.channels = ch1
# StreamBase-specific settings
host1.sources.sb-source1.sbserver-uri = sb://localhost:10000/
host1.sources.sb-source1.stream-name = ToFlume
# The next line is specifying the default converter class so strcitly speaking it's superfluous, but included for demonstration.
host1.sources.sb-source1.converter = com.streambase.sb.adapter.flume.StringSourceConverter
# The StringSourceConverter itself specified above also optionally takes a custom setting to specify which character encoding to use.
# Here again, the default behavior is to use the default charset but we're setting explicitly here for demonstration.
host1.sources.sb-source1.charset = UTF-8

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
host1.sinks.log-sink1.type = logger
host1.sinks.log-sink1.channel = ch1

# Finally, now that we've defined all of our components, tell
# host1 which ones we want to activate.
host1.sources = sb-source1
host1.sinks = log-sink1
host1.channels = ch1

We can similarly define a NetCat source feeding a StreamBase sink:

# Define a memory channel called ch1 on host1
host1.channels.ch1.type = memory

# Define a source that listens on a socket, netcat-style, and issues one Flume event per line of text received.
host1.sources.nc-source1.type = netcat
host1.sources.nc-source1.bind = localhost
host1.sources.nc-source1.port = 41414
host1.sources.nc-source1.channel = ch1

# Define a StreamBase sink called sb-sink1 on host1 and tell it
# to connect to the StreamBase server at localhost:10000.
# On the Flume side, connect the source channel ch1.
host1.sinks.sb-sink1.type = com.streambase.sb.adapter.flume.StreamBaseSink
host1.sinks.sb-sink1.channels = ch1
# StreamBase-specific settings
host1.sinks.sb-sink1.sbserver-uri = sb://localhost:10000/
host1.sinks.sb-sink1.stream-name = FromFlume
# Batch 50 tuples before sending, but don't wait more than 100 milliseconds before sending what tuples we have.
host1.sinks.sb-sink1.buffer-size = 50
host1.sinks.sb-sink1.flush-interval = 100
# The next line is specifying the default converter class so strcitly speaking it's superfluous, but included for demonstration.
host1.sinks.sb-sink1.converter = com.streambase.sb.adapter.flume.StringSinkConverter
# The StringSourceConverter itself specified above also optionally takes a custom setting to specify which character encoding to use.
# Here again, the default behavior is to use the default charset but we're setting explicitly here for demonstration.
host1.sinks.sb-sink1.charset = UTF-8

# Finally, now that we've defined all of our components, tell
# host1 which ones we want to activate.
host1.sources = nc-source1
host1.sinks = sb-sink1
host1.channels = ch1

Configuration in Cloudera CDH

If you plan on using the adapter in a Cloudera CDH cluster, the best way to configure the adapter is to using Cloudera Manager: Navigate to your Flume service's Configuration tab and paste your configuration settings in the Configuration File edit box. Make sure the Agent Name setting on the same page points to the agent defined in your configuration (in the examples above, the agent name would be host1).

Execution

Assuming the adapter's JAR files have been placed in Flume's lib directory (or lib/plugins.d subdirectory) as described above, and your configuration file has been correctly defined, all you need to use the adapter is:

  1. Have your StreamBase Server application running.

  2. Launch Flume 1.1 with a command line like the following:

    flume_installdir/bin/flume-ng node host1 -f myconfig.cfg -c myconfigdir
    

    For Flume 1.2, use this command:

    flume_installdir/bin/flume-ng agent -name host1 -f myconfig.cfg -c myconfigdir
    

Execution in Cloudera CDH

The lifecycle of the Flume agent is controlled by the cluster. Simply making sure your properly-configured Flume service is running in the cluster will ensure the adapter is ready to operate.

Sample

A sample StreamBase application is included when you install the adapter. The sample demonstrates the use of both the Input and Output Flume adapters. It uses a feed simulation to generate text lines, which are emitted on one port destined to a Flume Output adapter. This adapter converts the text to a Flume event using the default String-based converter and sends it on to the Flume channel. A Flume Input adapter is listening as the Flume sink on the other end of the channel, and converts the received event into a line of text that is placed in a tuple and sent back to the StreamBase application. For further details, refer to the sample's README file.