Modeling Guide for SAP Data Hub

Advanced Usage

This section explains how to create operators outside the UI of Vflow.

This section assumes the user is using an Unix-like system to develop their operators. To be able to run tests, you will also need to have the pysix_subengine package downloaded in your local machine. To download the pysix_subengine package follow these steps:
  1. Log in to SAP Data Hub System Management as cluster administrator.

  2. Click the File Management button.

  3. Click the Union View button.

  4. Navigate to vflow/subdevkits/ on the file explorer.

  5. Right click on the pysix_subengine folder and select Export File.

If you are interested only in creating Python2.7 operators, then you will need to structure your solution (a package that can be imported by System Management) as follows:

  • my_solution/
    • vrep/vflow/
      • subengines/com/sap/python27/operators/
        • myOperator1/
          • operator.py
          • operator.json
        • com/

          • mydomain/

            • myOperator2/

              • operator.py

              • operator.json

    • vsolution.json

The vsolution.json file should look like the one below:

{
    "name": "vsolution_vflow_my_solution",
    "description": "My Solution",
    "license": "my license"
} 

In this section, we are always going to refer to Python27 subengine, but the explanations equally apply to the Python36 subengine. You just need to replace 27 by 36 in the directories paths shown in this document.

To create an operator you will need to create a folder (or a series of folders) inside the <ROOT>/subengines/sap/com/python27/operators/ directory, where <ROOT> would be my_solution/vrep/vflow in the sample structure above. For example, if you want to create an operator with ID com.mydomain.util.appendString. Then you will need to create the folders for the path <ROOT>/subengines/sap/com/python27/operators/com/mydomain/util/appendString and place two files inside the last directory: operator.py and operator.json. Also, note that you will need to create empty __init__.py files inside every subfolder starting at operators onwards. In addition, the operator.json file can be automatically generated by running the script gen_operator_jsons.py , which is located inside the pysix_subengine package. The bash script below shows how you can use the gen_operator_jsons.py in your solution:

SCRIPT_PATH=<PYSIX_SUBENGINE_PATH>/scripts
SUBENGINE_ROOT=<ROOT>/subengines/com/sap/python27
START_DIR=operators
python $SCRIPT_PATH/gen_operator_jsons.py --subengine-root $SUBENGINE_ROOT --start-dir $START_DIR "$@"

Run the the script above after you have implemented the operator.py file, which is described next.

Below is example code inside the operator.py file:

from pysix_subengine.base_operator import PortInfo, OperatorInfo
from pysix_subengine.base_operator import BaseOperator

# The operator class name (which inherits BaseOperator) should have the same name as
# the operator's folder, except that its first letter should be uppercase.
class AppendString(BaseOperator):
    def __init__(self, inst_id, op_id):
        super(AppendString, self).__init__(inst_id=inst_id, op_id=op_id)

        # Default configuration fields. They will be shown in the UI.
        self.config.stringToAppend = ""
        self.config.method = "NORMAL"

        # Adds a callback '_data_in' that is called every time the
        # operator receives data in the inport 'inString'.
        self._set_port_callback('inString', self._data_in)
        self.__transform_data = None

    # This method is mandatory.
    # The operator.json will be generated based mostly on the OperatorInfo returned by this method.
    def _get_operator_info(self):
        inports = [PortInfo("inString", required=True, type_="string")]
        outports = [PortInfo("outString", required=True, type_="string")]
        return OperatorInfo("Append String",
                            inports=inports,
                            outports=outports,
                            icon="puzzle-piece",
                            tags={"numpy27": "1.16.1"}
                            dollar_type="http://sap.com/vflow/com.mydomain.appendString.schema.json#")

    def _data_in(self, data):
        self._send_message('outString', self.__transform_data(data + self.config.stringToAppend))

    # Configs set in the UI are already available when this method is called.
    # _init is called before any operator main loop has started.
    def _init(self):
        if self.config.method == "NORMAL":
            self.__transform_data = lambda x: x
        elif self.config.method == "UPPERCASE":
            self.__transform_data = lambda x: x.upper()
        else:
            raise ValueError("Unknown config set in configuration: '%s'." % self.config.method)

    # Called before the operator's main loop execution.
    # Other operators may already have started execution.
    def _prestart(self):
        pass

    # Called when the graph is being terminated.
    def shutdown(self):
        pass

The operator.py script defines both the operator attributes and the operator behavior.

Let us examine the script step-by-step. First look at the class definition:

class AppendString(BaseOperator):

As you can see, the AppendString class extends the built-in BaseOperator class. Also, the name of the class must match the name of the folder, but the first letter must be uppercase.

Now let's examine all methods involved:

def __init__(self, inst_id, op_id):
    super(AppendString, self).__init__(inst_id=inst_id, op_id=op_id)

    # Default configuration fields. They will be shown in the UI.
    self.config.stringToAppend = ""
    self.config.method = "NORMAL"

    # Adds a callback '_data_in' that is called every time the
    # operator receives data in the inport 'inString'.
    self._set_port_callback('inString', self._data_in)
    self.__transform_data = None

This method is the class constructor. The first thing it does is call super(AppendString, self).__init__(inst_id=inst_id, op_id=op_id).

In the next line, two new configuration fields called stringToAppend and method are created and the default values "" and "NORMAL" are assigned to it, respectively. All configuration fields you create will appear in the UI and be configurable by the user. You can always access these values in the script, as we will do soon.

Next, we set a callback called _data_in that will be called every time the operator receives data in the inport inString. We do that by using the method _set_port_callback defined by the BaseOperator class.

def _get_operator_info(self):
    inports = [PortInfo("inString", required=True, type_="string")]
    outports = [PortInfo("outString", required=True, type_="string")]
    return OperatorInfo("Append String",
                        inports=inports,
                        outports=outports,
                        icon="puzzle-piece",
                        tags={"numpy27": "1.16.1"}
                        dollar_type="http://sap.com/vflow/com.mydomain.appendString.schema.json#")

The _get_operator_info method must always be implemented by all your operators that are created in this way. This method is used to generate the operator's json automatically, so you need to specify the operator attributes here. To do this, this method must return an OperatorInfo object. The OperatorInfo has the following attributes:

class OperatorInfo(object):
    """
    Attributes:
        description (str): Human readable name of the operator.
        icon (str): Icon name in font-awesome.
        iconsrc (str): Path to a icon image. Alternative to the option to a icon from font-awesome.
        inports (list[PortInfo]): List of input ports.
        outports (list[PortInfo]): List of output ports.
        tags (dict[str,str]): Tags for dependencies. dict[library_name, lib_version].
        component (str): This field will be set automatically.
        config (dict[string,any]): This field will be set automatically.
        dollar_type (str): Url to $type schema.json for this operator.
    """

You can specify all of these attributes on the get_operator_info method. In the example, we specified an input port called inString that receives a string, an output port called outString that also receives a string, the built-in icon puzzle-piece as the operator's icon and Append String as operator's description. Note that the inString port name was used in the constructor to set the correct callback to this port.

def _init(self):
    if self.config.method == "NORMAL":
        self.__transform_data = lambda x: x
    elif self.config.method == "UPPERCASE":
        self.__transform_data = lambda x: x.upper()
    else:
        raise ValueError("Unknown config set in configuration: '%s'." % self.config.method)

This method is being overridden from BaseOperator superclass. This method will be called after the user defined configurations have already been set. This method will be called for all graph's operators before any operator has been started. Here, we are deciding which transform_data function to use based on the self.config.method parameter set by the user in the UI.

def _data_in(self, data):
    self._send_message('outString', self.__transform_data(data + self.config.stringToAppend))

This function is the callback we created to handle inputs to the inString input port. Here, we are appending the input data with our stringToAppend configuration field, applying the function self.__transform_data to it, and using the BaseOperator's _send_message method to send the result to our output port outString.

def _prestart(self):
    pass

This method is being overridden from BaseOperator superclass. It should contain code to be executed before the operator's main loop is started.

def shutdown(self):
    pass

This method is being overridden from BaseOperator superclass. It should contain code to be executed after the operator's main loop is finished.

A list of all methods is available at List of BaseOperator Methods.