Creating a Runtime Class

Now that you have created a signature and a GUI node, it is time to create the runtime. This is where the bulk of the operator's action occurs; you define the Spark job that performs the data transformation here.

While you could extend the base class OperatorRuntime and define output steps manually, we will use SparkDataFrameRuntime. This allows us to easily write a Spark job that performs a data transformation and uses a set of predefined methods on the back end to package the results and return them to the Team Studio application.

We will call this MyColumnFilterRuntime and extend SparkDataFrameRuntime, passing in our Spark job MyColumnFilterJob as a type parameter. For our Column Filter operator, we submit a Spark job called MyColumnFilterJob. To use the default implementation that launches a Spark job according to your default Spark settings, you do not need to add any code to the body of the MyColumnFilterRuntime class.

Prerequisites

You must have defined the output schema.

Procedure

  1. Add the following code:
    class MyColumnFilterRuntime extends SparkDataFrameRuntime[MyColumnFilterJob] {
     
    }

    Finally, we need to implement the Spark job that we referenced in the previous class.

  2. Create a class called MyColumnFilterJob and extend SparkDataFrameJob. Override the transform() method to start writing your Spark code, as follows:
    class MyColumnFilterJob extends SparkDataFrameJob {
     override def transform(parameters: OperatorParameters,
             dataFrame: DataFrame,
             sparkUtils: SparkRuntimeUtils,
             listener: OperatorListener): DataFrame = {
     
             }
    }
    The parameters are passed from the parameters the user selects at design time. When the operator starts running, the parameter information is passed to the runtime for us. The parameter dataFrame is the input, and the function should return a data frame when the code completes.
  3. To perform the Column Filter operation, we must get a list of the parameters the user selected, and then return a data frame with only those columns included.
    First, we access the user-selected columns using parameters.getTabularDatasetSelectedColumns(OperatorConstants.parameterID). OperatorConstants.parameterID is the value referenced in the Constants class. It points to the columns that you chose in the OperatorDialog step. This returns a collection of column names that were selected. After that, we apply Spark's col() method across the collection of column names. Add the following code:
    class MyColumnFilterJob extends SparkDataFrameJob {
     
     override def transform(parameters: OperatorParameters,
                            dataFrame: DataFrame,
                            sparkUtils: SparkRuntimeUtils,
                            listener: OperatorListener): DataFrame = {
       // grab the selected column names
       val columnNamesToKeep = parameters.getTabularDatasetSelectedColumns(OperatorConstants.parameterID)._2
       // returns the columns from the DF containing the column names
       val columnsToKeep = columnNamesToKeep.map(dataFrame.col)
       // returns a data frame with only the selected columns included
       dataFrame.select(columnsToKeep: _*)
     
     }
    }

When Your Operator Runs

When the operator is executed, a data frame with the selected columns is returned from the Spark job. After the job finishes, the SparkDataFrameJob class saves the results as a file using the storage parameters defined in the onPlacement() method. If you did not specify, this defaults to storing on HDFS. This information is then passed to the Team Studio plugin engine so that your results can be visualized.

For basic Spark transformations using SparkDataFrameGUINode , the result console output is automatically configured to show a preview of the output table.