HDFS File Reader Input Adapter

Introduction

The TIBCO StreamBase® File Reader For Apache Hadoop Distributed File System (HDFS) reads a text file and emits a tuple with a string field containing the entire contents of the file.

The adapter can be configured to read a file at start-up or in response to the receipt of a tuple on its control input port.

Note

When using the HDFS core libraries on Windows, you may see a console message similar to Failed to locate the winutils binary in the hadoop binary path. The Apache Hadoop project acknowledges that this is an error to be fixed in a future Hadoop release. In the meantime, you can either ignore the error message as benign, or you can download, build, and install the winutils executable as discussed on numerous Internet resources.

HDFS File Reader Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Data Type Default Description StreamSQL Property
Default File Name String None The default file name if no name is provided on the control input port. Choose file by selecting from list box or by browsing. DefaultFileName
User String None The default HDFS username if no username is provided on the control input port. DefaultUser
Read Default File at Startup check box Unselected If enabled, the default file is read at startup. ReadDefaultFileAtStartup
Use Default Charset check box Selected If selected, specifies whether the Java platform default character set is to be used. If cleared, a valid character set name must be specified for the Character Set property. UseDefaultCharset
Character Set string None The name of the character set encoding that the adapter is to use to read input or write output. Charset
Enable Control Port check box Selected If enabled, a control input port is present for enqueuing tuples with the names of files to read. EnableControlPort
Enable Status Port check box Selected If enabled, a status output port is present on which tuples are emitted to convey to important events within the adapter. EnableStatusPort
Read As Binary Data check box Unselected If enabled the adapter will read the file as binary data into a single tuple with a blob field. BlobData
Extract Blob Compressed Contents check box Selected If enabled and the file contents is a compressed file, the adapter will attempt to extract a single compressed entry and output the result as a blob. If disabled the contents of the file will be produced directly. ExtractZipBlobData
Serialization Format drop-down list None When reading in binary format this is the serialization format to use. serializeType
One Tuple Per Line check box Unselected If enabled, a tuple is emitted for each line read from the file. If disabled, the entire file contents is emitted in a single tuple. OneTuplePerLine
Preserve New Lines check box Unselected If enabled, new lines read from the input file are preserved. PreserveNewLines
Line Separator String None If not preserving new lines from the input file, new lines are replaced with this string. LineSeparator
Pass Through Fields check box Unselected If enabled, the incoming tuples fields will be duplicated to the outgoing tuple. If enabled you must specify which input field provides the incoming file name. PassThroughFields
Filename Field String None Determines which field of the incoming tuple will be used as the filename to process. Used when Pass Through Fields is checked. FilenameField
File Contents Field Name String FileContents The name of the field in the outgoing tuple that holds the file contents. May only be used when Pass Through Fields is selected. FileContentsFieldName
Replacement Data Field Name String None The name of the field in the incoming tuple that will supply the data for any parameter replacements in the file being read. Can only be used when Pass Through Fields is selected. See the Replacement Fields section below for further details. ReplacementDataFieldName
Log Level drop-down list INFO Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. LogLevel

HDFS Tab

Property Data Type Description
Buffer Size (Bytes) int The size of the buffer to be used, if empty the default will be used.
Configuration Files Strings The HDSF configuration files to use when connecting to the HDFS file system for example 'core-defaults.xml' is the standard file to use.

Edit Schema Tab

The schema is used when you select either the Apache Avro or Parquet serialization format. You can click the Generate Schema link on this tab, which will display a dialog to enter the HDFS://, file://, or local file from which to load the schema.

Serialize Format Data Types

Avro Data Type Map to StreamBase

StreamBase Data Type Avro Data Type
BLOB BYTES, FIXED, STRING, UNION[NULL, BYTES], UNION[NULL, FIXED], UNION[NULL, STRING]
BOOL BOOLEAN, UNION[NULL, BOOLEAN]
DOUBLE DOUBLE, FLOAT, UNION[NULL, DOUBLE], UNION[NULL, FLOAT]
INT INT, UNION[NULL, INT]
LIST ARRAY, UNION[NULL, ARRAY]
LIST<TUPLE<STRING key, ? value>> MAP, UNION[NULL, MAP]
LONG LONG, UNION[NULL, LONG]
STRING STRING, UNION[NULL, STRING]
TIMESTAMP STRING, LONG, DOUBLE, UNION[NULL, STRING], UNION[NULL, LONG], UNION[NULL, DOUBLE]
TUPLE RECORD, UNION[NULL, RECORD]
FUNCTION STRING, UNION[NULL, STRING]
CAPTURE * Not supported

Parquet Data Type Map to StreamBase

StreamBase Data Type Parquet Data Type
BLOB BINARY
BOOL BOOLEAN
DOUBLE DOUBLE, FLOAT
INT INT
LIST LIST
LIST<TUPLE<? key, ? value>> MAP
LONG LONG
STRING BINARY UTF8
TIMESTAMP BINARY UTF8, INT64, DOUBLE
TUPLE GROUP
FUNCTION BINARY UTF8
CAPTURE * Not supported

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Replacement Fields

Replacement fields are tokens in input files that take the form of StreamBase parameters, such as ${param-name}. The adapter replaces each such token at run time with data from its input tuple. When the adapter reads the file, it matches each token to a subfield of the same name from the input stream's tuple, and the value of that field is substituted for the token. All tokens are similarly processed before sending output.

All tokens must match the name of some field in the input tuple for the adapter to generate output. If any token does not match, StreamBase Server throws a StreamBase Exception: Unable to access a field from the replacement tuple while while trying to parse item <token>.

Although the tokens take the form of StreamBase module parameters, they are scoped to the adapter instance, which has no capability to substitute StreamBase parameter values for tokens.

A token can access an array by indexing into it using the standard [0] bracket notation, such as ${ListField[0].SubField}.

Tokens can format field values by including standard Java format strings within braces, inserted at the end of the parameter definition. An example for an integer would be ${IntField{%05d}}. Formatting is supported for all data types except for string and boolean fields. Use the standard java.text.SimpleDateFormat class rules for formatting timestamps.

The following table illustrates how tokens can format field values.

Data Type Token and Formatting Examples
String ${StringField}
Boolean ${BooleanField}
Int ${IntField} or formatted ${IntField{%05d}}
Long ${LongField} or formatted ${LongField{%05d}}
Double ${DoubleField} or formatted ${DoubleField{%+10.4f}}
Timestamp ${TimeField} or formatted ${TimeField{dd-MMM-yyyy hh:mm:ss}}
List ${ListField[0]} or lists in lists ${NestedListListField[0][0].Field}
Tuple ${TupleField}
Tuple, JSON-formatted ${TupleField{JSON}}
Tuple, JSON, including null fields ${TupleField{JSON|INCLUDE_NULL_FIELDS}}
Tuple, JSON, formatted as list ${TupleField{JSON|PREFER_LIST}}
Tuple, JSON, formatted as map ${TupleField{JSON|PREFER_MAP}}

Example of Field Replacement

Given an input text file with the following content:

Line 1 has an int replacement field: ${intval1}
Line 2 has a double replacement field: ${doubleval1{%+5.1f}}
Line 3 has a list of doubles replacement field:${listval1{%6.2f}}
Line 4 has a string replacement field: ${stringval1{%12s}}
Line 5 has a JSON tuple replacement field ${tupleval1{JSON}}

a control port input schema like this:

FileName string
replacements (
    stringval1 string,
    doubleval1 double, 
    listval1 list(double), 
    intval1 int,
    tupleval1 (
        tint1 int,
        tts1 timestamp
    )
)

and an input tuple with these values (using a default file name):

FileName    = null,
replacements=(stringval1=StreamBase,
              doubleval1=456.789,
              listval1=[9.81,7.62,5.43],
              intval1=123,
              tupleval1 (
                   tint1=101,
                   tts1=2015-01-15 08:15:20
              )
)

The file reader outputs:

Line 1 has an int replacement field: 123
Line 2 has a double replacement field: +456.8
Line 3 has a list of doubles replacement field:[9.81, 7.62, 5.43]
Line 4 has a string replacement field:   StreamBase
Line 5 has a JSON tuple replacement field {"tint1":101,"tts1":"2015-01-15 08:15:20.000-0500"}

The order of fields in the schema does not have to match the order of tokens in the input file. The same token can appear any number of times. Each instance will receive the same replacement value.

Description of This Adapter's Ports

The File Reader adapter's ports are used as follows:

  • Control (input): Tuples enqueued on this port cause the adapter to read a text file. The schema for this port has the following field:

    • FileName, string. The name of the file to read. If null, the file name is taken from the adapter's Default File Name property.

    Note

    If Pass Through Fields is selected, then this schema can be anything, but you must specify the Filename Field.

  • Status (output): The adapter emits tuples from this port when significant events occur, such as when an attempt to read a file fails. The schema for this port has the following fields:

    • Type, string. Returns one of the following values to convey the type of event:

      • User Input

      • Read

    • Action, string. Returns an action associated with the event type:

      • Rejected

      • Error

      • Starting

      • Finished

    • Object, string. Returns an event type-specific value, such as the name of the file which a read failed or the control input tuple that was rejected.

    • Message, string. Returns a human-readable description of the event.

  • Data (output): Tuples are emitted on this port when files are successfully read. The schema for this port contains the following fields:

    Note

    If Pass Through Fields is selected, then this schema instead matches your incoming schema with an additional field with the name specified in the File Contents Field Name parameter.

    • FileName, string. The name of the file that was read.

    • FileContents, string. The contents of the file.

Typechecking and Error Handling

The File Reader adapter uses typecheck messages to help you configure the adapter within your StreamBase application. In particular, the adapter generates typecheck messages for the following reasons:

  • The Control Input Port does not have exactly one field, of type string, to receive the name of the file to read.

  • The Default File Name property contains the name of an invalid file.

The adapter generates warning messages during runtime under various conditions, including:

  • A control tuple is received with a null value in its FileName field and a value for the adapter's Default File Name property has not been specified.

  • An error occurs attempting to read a file.

Suspend and Resume Behavior

When suspended, the adapter stops processing requests to read files.

When resumed, the adapter once again starts processing requests to read files.

Related Topics