Defining the Output Schema

To ensure our output displays in a consistent fashion, we define the output schema for our data frame.

The schema must be set in both the runtime and the GUI node class, and they must match. The GUI node must know about the output schema in order to properly format any output visualization and to let subsequent operators know what this operator's dataset schema looks like.

Prerequisites

You must have built the Operator dialog box.

Procedure

  • Add the following code:
    override def defineOutputSchemaColumns(inputSchema: TabularSchema,
                                            parameters: OperatorParameters): Seq[ColumnDef] = {
       val columnsToKeep = parameters.getTabularDatasetSelectedColumns(OperatorConstants.parameterID)._2
       inputSchema.getDefinedColumns.filter(colDef => columnsToKeep.contains(colDef.columnName))
     }

    The first line pulls the column names selected from the TabularDatasetColumnCheckbox parameter defined in the operatorDialog. The second line filters the available columns by the selected column names and returns those columns as the output schema.

    Your code should now look like:

    class MyColumnFilterGUINode extends SparkDataFrameGUINode[MyColumnFilterJob]{
     override def onPlacement(operatorDialog: OperatorDialog,
                               operatorDataSourceManager: OperatorDataSourceManager,
                               operatorSchemaManager: OperatorSchemaManager): Unit = {
     
       operatorDialog.addTabularDatasetColumnCheckboxes(
     OperatorConstants.parameterID,      // the ID of the operator
     "Columns to keep", // the label of the operator (user-visible)
     ColumnFilter.All,     // this means users can select all of the columns
                        // but this can also be changed to allow for only
                        // numeric or categorical columns
     "main"             // this is the selectionGroupId,
                        // which is used for validating groups of parameters
       super.onPlacement(operatorDialog, operatorDataSourceManager, operatorSchemaManager)
     }
     
     
     override def defineOutputSchemaColumns(inputSchema: TabularSchema,
                                            parameters: OperatorParameters): Seq[ColumnDef] = {
       val columnsToKeep = parameters.getTabularDatasetSelectedColumns(OperatorConstants.parameterID)._2
       inputSchema.getDefinedColumns.filter(colDef => columnsToKeep.contains(colDef.columnName))
     }
    }