Modeling Guide for SAP Data Hub

Python3 Operator

The Python3Operator user can define a script which offers some convenience functions offered by the API object. For example, you can set a callback that is called when you receive new data in the "datain" port by writing api.set_port_callback("datain", callback_name). Other conveniences provided by API will be described in the remaining sections of this document.

An operator extending the Python3Operator can be created directly in the UI in the repository tab

Configuration Parameters

Parameter

Type

Description

script

string

Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed.

Input

None

Output

None

Basic Examples

Count all incoming messages on port named "input" and write the count to the port named "output".

counter = 0

def on_input(data):
    global counter
    counter += 1
    api.send("output", counter)

api.set_port_callback("input", on_input)

Notice that the port "output" of an operator containing the above script should be of type int64 since the variable counter is of python type int. See the section Correspondence between Data Pipeline types and python types for more information on how to choose your port types.

api.set_port_callback can also be used to wait for two input ports to have received data before calling the handler:

def on_input(data1, data2):
    api.send("output", data1 + data2)

api.set_port_callback(["input1", "input2"], on_input)

api.send (port, data)

Send `data` to outport named `port`.

Args:
    port (str): Name of output port.
    data (...): Objectr to be sent.

Port Callbacks

api.set_port_callback(ports, callback)

This method associate input `ports` to the `callback`. The `callback` will only be called when there are
messages available in all ports in `ports`. If this method is called multiple times for the same group of ports
then the old `callback` will be replaced by a new one. Different ports group cannot overlap. The same callback
function cannot be reused in different port groups.

Args:
    ports (str|list[str]): input ports to be associated with the callback. `ports` can be a list of strings
                           with the name of each port to be associated or a string if you want to associate the
                           callback just to a single port.
    callback (func[...]): a callback function with the same number of arguments as elements in `ports`.
                          Also the arguments will passed to `callback` in the same order of their corresponding
                          ports in the`ports` list.
TIP: If you want to reuse the same callback function in multiple port groups you can have a function that returns your function of interest:
def get_my_callback():
  def my_callback(data):
      # some code here
  return my_callback

api.set_port_callback("inport1", get_my_callback())
api.set_port_callback("inport2", get_my_callback())

In this way, a new function (with different id) will be generated each time, but the callback behaviour will be the same.

api.remove_port_callback(callback)

Deregister the `callback` function. If it doesn't exist the method will exit quietly.

Args:
    callback: Callback function to be removed.

Generators

Generators are functions which are executed before the event processing loop starts. They are executed in the order they are added.
counter = 0

def gen():
    global counter
    for i in xrange(0, 3):
        api.send("output", counter)
        counter += 1

api.add_generator(gen)
api.add_generator(gen)

This example produces values 0,1,2,3,4,5 on the output port "output".

Timers

api.add_timer(period, callback)

Multiple distinct periodic callbacks can be added. If an already added callback is added again, the old period
will be replaced by the new `period`. Timers are not preemptive. Thus, the given interval provides only the
lower bound of the interval at which the timer function is called.
A zero `period` implies that the callback will be called as fast as possible.
If you want two callbacks with identical behaviour to be run
simultaneously then you will need to create two different functions with identical body or create or
create a factory function that defines a inner function and returns it. Each time the factory function
is called a new function (with different id) will be returned but with identical behaviour. See the TIP
in the set_port_callback section for an example a factory function.

Args:
    callback (func): Callback function to be called every `milliseconds`.
    period (str): Period between calls of `callback`. It is passed as a string.
                  For example, "-1.3h3m2us" means minus (1.3 hours 3 minutes and 2 microseconds)
                  which will be converted to: -(1.3*3600 + 3*60 + 2*1e-6) = -4860.000002 seconds.
                  Available suffixes are: h, m, s, ms, us, ns.
                  They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively.
                  If the input uses multiple units then it needs to follow the same order presented in
                  the previous sentence. That is, "2h3s" is allowed, but "3s2h" is not.
                  Only the  strings "0", "+0", "-0" are allowed to not have a unit of time suffix
                  (they are optional in these cases). Signs are optional and are allowed just in the
                  beginning of the string.
Example:
counter = 0

def t1():
    global counter
    api.send("output", counter)
    counter += 1

api.add_timer("1s", t1)

This example produces value 0,1,2... on port "output" until graph shutdown.

api.remove_timer(callback)

Deregister timer callback.

Args:
    callback (func): Callback function to be removed.

api.update_timer(period, callback)

Update the period of an existing `callback`.
No error is thrown if the callback does not exist.

Args:
    callback (func): Callback function for which the period will be updated. If `callback` is not registered
                     nothing will be changed.
    period (str): String number representing a new period to overwrite the old value.

Shutdown Handlers

Shutdown handlers are functions which are executed in the order they are added after the event processing loop execution.
counter = 0

def on_input(data):
    global counter
    counter += 1

api.set_port_callback("input", on_input)

def shutdown1():
    print ("shutdown1: %d" % counter)

def shutdown2():
    print ("shutdown2: %d" % counter)

api.add_shutdown_handler(shutdown1)
api.add_shutdown_handler(shutdown2)counter = 0

def on_input(data):
    global counter
    counter += 1

api.set_port_callback("input", on_input)

def shutdown1():
    print "shutdown1: %d" % counter

def shutdown2():
    print "shutdown2: %d" % counter

api.add_shutdown_handler(shutdown1)
api.add_shutdown_handler(shutdown2)

This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port.

Configuration API

The API object provides the possibility to read configurations defined on the operator´s editor view or added in the configurations panel in the graph´s view. If you defined a config named "foo" you can access it by calling api.config.foo. Configuration field names cannot contain spaces. The "script" and "codelanguage" config fields won't appear in api.config as they are mandatory configuration parameters and should not be used by the end user.

Logging

To log messages use api.logger.info("some text"), api.logger.debug(""), api.logger.warn(""), or api.logger.error("").

Error Handling

You can raise any exception inside the script and the exception message will be logged and the operator will be terminated, except if the exception is thrown inside a thread different than the script's main thread. In this case, you need to call api.propagate_exception(e), where "e" is the exception object, to handle it correctly.

Do not write to stderr because that could have unintended consequences. Also, do not use api.logger.fatal or api.logger.critical, otherwise the graph will be stopped. Raise an exception or use api.propagate_exception if you want to stop the graph and log the error.

repo_root and subengine_root

You can access the repo_root and subengine_root path using api.repo_root and api.subengine_root, respectively.

Port Names

You can get the name of the input and output ports for the current operator instance by calling the methods api.get_inport_names() and api.get_outport_names(), respectively.

Message Type

You can access the Message type in the api object as api.Message. The message type can be constructed like so: api.Message(body, attributes), where body can be any object, and attributes should be a dictionary of str to object, or None. The body argument is mandatory while attributes is optional and defaults to None. The body and attributes of a message object msg can be accessed as msg.body and msg.attributes, respectively.

Correspondence between Data Pipeline Types and Python Types

The Data Pipeline types are the ones which are allowed in the operator ports. For example, if you have a input port of type blob then the python object that you are going to receive as argument to your port callback will be of type str. Now, if the output port of your operator has type string then you should send a python object of type unicode in its output port.

Data Pipeline

Python2

string

unicode

blob

str

int64

int, long

uint64

int, long

float64

float

byte

int

message

api.Message

FAQ

Where should I place initialization code for my operator?

The script will be executed just once. Now the callback functions defined in the script can be executed multiple times, but the commands from the script's outermost scope will be executed just once. This implies that one can simply place initialization code in the body of the script. Below we show an example:
# Hypothetical script for Python2Operator

# In the operator's initialization we might want to setup a connection
# with a database for example (`setup_connection` is an hypothetical function):
db = setup_connection(api.config.host, api.config.port)

def my_callback_func(data):
    global db
    db.write(data)  # hypothetical call

api.set_port_callback("input", my_callback_func)

Alternatively, one can also place initialization code in a generator callback that is registered with api.add_generator(func).