Modeling Guide

Python2Operator

The Python2Operator user can define a script which offers some convenience functions offered by the api object. For example, you can set a callback that is called when you receive new data in the "datain" port by writing api.set_port_callback("datain", callback_name). Other conveniences provided by API will be described in the remaining sections of this document.

Configuration Parameters

Parameter

Type

Description

script

string

Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed.

Input

None

Output

None

Basic Examples

Count all incoming messages on port named "input" and write the count to the port named "output".

counter = 0

def on_input(data):
    global counter
    counter += 1
    api.send("output", counter)

api.set_port_callback("input", on_input)

api.set_port_callback can also be used to wait for two input ports to have received data before calling the handler:

def on_input(data1, data2):
    api.send("output", data1 + data2)

api.set_port_callback(["input1", "input2"], on_input)

Generators

Generators are functions which are executed before the event processing loop starts. They are executed in the order they are added.
counter = 0

def gen():
    global counter
    for i in xrange(0, 3):
        api.send("output", counter)
        counter += 1

api.add_generator(gen)
api.add_generator(gen)

This example produces values 0,1,2,3,4,5 on the output port "output".

Timers

Timers call the handler at given intervals. Timers are not preemptive. Thus, the given interval provides only the lower bound of the interval at which the timer function is called. All timer handlers are stopped prior to the execution of shutdown handlers. A zero or negative time implies that the callback will be called as fast as possible.

The time is passed as a string. For example, "-1.3h3m2us" means minus (1.3 hours 3 minutes and 2 microseconds) which will be converted to: -(1.33600 + 360 + 2*1e-6) = -4860.000002 seconds.

Available suffixes are: h, m, s, ms, us, ns. They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively. If the input uses multiple units then it needs to follow the same order presented in the previous sentence. That is, "2h3s" is allowed, but "3s2h" is not. Only the strings "0", "+0", "-0" are allowed to not have a unit of time suffix (they are optional in these cases). Signs are optional and are allowed just in the beginning of the string.
counter = 0

def t1():
    global counter
    api.send("output", counter)
    counter += 1

api.add_timer("1s", t1)

This example produces value 0,1,2... on port "output" until graph shutdown.

Shutdown Handlers

Shutdown handlers are functions which are executed in the order they are added after the event processing loop execution.
counter = 0

def on_input(data):
    global counter
    counter += 1

api.set_port_callback("input", on_input)

def shutdown1():
    print "shutdown1: %d" % counter

def shutdown2():
    print "shutdown2: %d" % counter

api.add_shutdown_handler(shutdown1)
api.add_shutdown_handler(shutdown2)counter = 0

def on_input(data):
    global counter
    counter += 1

api.set_port_callback("input", on_input)

def shutdown1():
    print "shutdown1: %d" % counter

def shutdown2():
    print "shutdown2: %d" % counter

api.add_shutdown_handler(shutdown1)
api.add_shutdown_handler(shutdown2)

This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port.

Configuration API

The API object provides the possibility to read configurations defined on the operator´s editor view or added in the configurations panel in the graph´s view. If you defined a config named "foo" you can access it by calling api.config.foo. Configuration field names cannot contain spaces. The "script" and "codelanguage" config fields won't appear in api.config as they are mandatory configuration parameters and should not be used by the end user.

Logging

To log messages use api.logger.info("some text"), api.logger.debug(""), api.logger.warn(""), or api.logger.error("").

Error Handling

You can raise any exception inside the script and the exception message will be logged and the operator will be terminated, except if the exception is thrown inside a thread different than the script's main thread. In this case, you need to call api.propagate_exception(e), where "e" is the exception object, to handle it correctly.

Do not write to stderr because that could have unintended consequences. Also, do not use api.logger.fatal or api.logger.critical, otherwise the graph will be stopped. Raise an exception or use api.propagate_exception if you want to stop the graph and log the error.

repo_root and subengine_root

You can access the repo_root and subengine_root path using api.repo_root and api.subengine_root, respectively.

Port Names

You can get the name of the input and output ports for the current operator instance by calling the methods api.get_inport_names() and api.get_outport_names(), respectively.

Message Type

You can access the Message type in the api object as api.Message. The message type can be constructed like so: api.Message(body, attributes), where body can be any object, and attributes should be a dictionary of str to object, or None. The body argument is mandatory while attributes is optional and defaults to None. The body and attributes of a message object msg can be accessed as msg.body and msg.attributes, respectively.

Correspondence between Data Pipeline Types and Python Types

The Data Pipeline types are the types one will specify in the port type fields in the operator.json file.

Data Pipeline

Python2

string

unicode

blob

str

int64

int, long

uint64

int, long

float64

float

byte

int

message

api.Message

FAQ

Where should I place initialization code for my operator?

The script will be executed just once. Now the callback functions defined in the script can be executed multiple times, but the commands from the script's outermost scope will be executed just once. This implies that one can simply place initialization code in the body of the script. Below we show an example:
# Hypothetical script for Python2Operator

# In the operator's initialization we might want to setup a connection
# with a database for example (`setup_connection` is an hypothetical function):
db = setup_connection(api.config.host, api.config.port)

def my_callback_func(data):
    global db
    db.write(data)  # hypothetical call

api.set_port_callback("input", my_callback_func)

Alternatively, one can also place initialization code in a generator callback that is registered with api.add_generator(func).