Using the Spark MLlib Model Evaluator Operator

Introduction

The TIBCO StreamBase® Operator for Spark/MLlib Model Evaluator enables StreamBase applications to execute numerical models generated with Apache Spark (http://spark.apache.org/mllib).

Spark is a Big Data distributed processing framework. To accommodate the growth of the importance of the statistical and machine learning techniques, the Spark MLlib was created. MLlib provides powerful training and execution pipelines allowing the transformation of one data frame into a resulting one. The steps in the processing are usually hidden from the user and may include such operations as factorization, dimensionality reduction, filtering, and so on.

Pipelines are powerful artifacts allowing the logic complexity to remain hidden behind a simple interface. The downside of Spark models is that they are optimized for data frame processing and scoring of single sample requires significant overhead. Therefore, the recommended design is batch collection and processing.

The operator processes input data given as a tuple or a list of tuples. The tuple schema corresponds to the input parameters of the pipeline. For each model the operator generates output data that matches the defined output schema. Depending on the input data, the output can be a single tuple or a list of tuples. In Spark MLlib, the pipeline may change the number and ordering of rows.

Note that, unlike in H2O or PMML, a single pipeline may implement several models at the same time.

Dynamic definition of the models allows you to provide additional metadata to the deployed models. The metadata is attached to the model result allowing the event flow to take action based on the model attributes. Attribute examples include: champion/challenger flag, category for propensity scoring, and so on.

The Spark MLlib model evaluator is implemented using the version of the Spark MLib libraries described on the Supported Configurations page.

In order to use Spark MLlib operator, you must include the compatible Spark assembly library in the runtime environment.

All model operators support an arbitrary number of simultaneous models, and the ability to score single samples as well as data frames.

Operator Properties

This section describes the properties you can set for this operator, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this operator starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the operator does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Property Type Description
Control Port check box Enables dynamic reconfiguration of the model list. Control port enables also control output port which reports status of the model loading request. Control port supports all-or-nothing semantics. That is, either the full list successfully loads and replaces the currently deployed models or it reports failure.
Status Port check box Enables failure notifications. If the scoring fails, the failure is emitted to the status port including the original input tuple.
Timing Info check box Fine granular timing information. It collects the effective times of input conversion, model evaluation and output conversion. Time is in nanoseconds.
Log Level Drop-down list Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Models Tab

Property Type Description
Model serialization type enumeration
  • Simple - simple Java serialization of the pipeline object saved in object sequence file.

  • Pipeline - Spark 1.6+ API compatible MLWritable serialization.

Before Spark 1.6 there was no persistence API for models and pipelines. Spark 1.6.0 introduced a new MLWritable API allowing to you to save and read models using standard calls. In release 1.6.0, most of the MLlib pieces do not implement the MLWritable, however.
Model URLs name/value pairs List of design-time specified models. The models consist of a name and a URL pointing to the model definition.

Schemas

Property Type Description
Result Data Schema schema Anticipated schema for model output. Only fields defined in the schema are used in the output tuple. For multiple rows, scoring includes identification fields, because Spark may reorder records.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Data Input Port

The data port is the default input port for the model operator. It is always enabled. Use data port to execute the model scoring.

The default schema for the data input port is:

  • frame, tuple or list(tuple). Samples to be scored by the deployed models.

    The tuple structure contains primitive fields (int, long, double, string or boolean) with names corresponding to model input fields. Each field in the input structure is represented as a column in the input data frame. A list of doubles is also supported and rendered as a vector type.

  • * arbitrary pass through parameters.

Unrecognized fields are transparently passed. The frame field is not propagated; the scores field is not allowed.

Scores Output Port

The scores port provides a list of model evaluation results.

The schema for the scores output port is:

  • scores, list(tuple). List of records for each currently deployed model.

  • scores.modelName, string. Name of the model defined in the Model URLs or provided via control port.

  • scores.modelUrl, string. URL defining the model configured in the Model URLs or provided via control port.

  • scores.modelData, blob. Binary defining the model if used to load the model.

  • scores.score, tuple or list(tuple). The type depends on the type of frame input. If input is a list of tuples operator may not preserve the ordering.

  • scores.*. Arbitrary parameters provided during model redeployment on the control port.

  • * parameters other than frame.

The scores port transparently replicates unrecognized fields; the frame field is not propagated.

Control Input Port

The control port enables runtime redeployment of models. The models are deployed in all-or-nothing semantics. This means if all the provided models are successfully loaded, they fully replace the current set.

The schema for the control input port is:

  • models, list(tuple). List of record for each model to be deployed.

  • models.modelName, string. Logical name of the model.

  • models.modelUrl, string. URL defining the model.

  • models.modelData, blob. The binary data of the model. This field can be used to load a model directly from any source. This field is only used if models.modelUrl is null or empty. The blob data is expected to be a binary representation of a compressed zip file which holds the contents of the model folder.

  • models.*. Arbitrary parameters describing the model. They are later provided in the score.

  • *. Arbitrary parameters provided during model redeployment on the control port.

The status port transparently replicates unrecognized fields; do not use the status or message fields on the input port.

Status Output Port

The status port provides responses for runtime model deployment. The tuples are emitted only as responses to the control port tuples.

The schema for the status output port is:

  • status, string. Status of deployment. Either success or failure.

  • message, string. Descriptive status message.

  • models, list(tuple). List of record for each model to be deployed.

  • models.status, string. Status of the model loading. Either success or failure.

  • models.message, string. Descriptive model status message.

  • models.modelName, string. Logical name of the model.

  • models.modelData, blob. Binary defining the model if used to load the model.

  • models.modelUrl, string. URL defining the model.

  • models.*. Arbitrary parameters describing the model. They are later provided in the score.

  • * parameters other than models.

The status port transparently replicates unrecognized fields from the control port.