Python3 Operator
The Python3Operator user can define a script which offers some convenience functions offered by the API object. For example, you can set a callback that is called when you receive new data in the "datain" port by writing api.set_port_callback("datain", callback_name). Other conveniences provided by API will be described in the remaining sections of this document.
An operator extending the Python3Operator can be created directly in the UI in the repository tab
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
script |
string |
Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed. |
Input
None
Output
None
Basic Examples
Count all incoming messages on port named "input" and write the count to the port named "output".
counter = 0 def on_input(data): global counter counter += 1 api.send("output", counter) api.set_port_callback("input", on_input)
Notice that the port "output" of an operator containing the above script should be of type int64 since the variable counter is of python type int. See the section Correspondence between Data Pipeline types and python types for more information on how to choose your port types.
api.set_port_callback can also be used to wait for two input ports to have received data before calling the handler:
def on_input(data1, data2): api.send("output", data1 + data2) api.set_port_callback(["input1", "input2"], on_input)
api.send (port, data)
Send `data` to outport named `port`. Args: port (str): Name of output port. data (...): Objectr to be sent.
Port Callbacks
api.set_port_callback(ports, callback)
This method associate input `ports` to the `callback`. The `callback` will only be called when there are messages available in all ports in `ports`. If this method is called multiple times for the same group of ports then the old `callback` will be replaced by a new one. Different ports group cannot overlap. The same callback function cannot be reused in different port groups. Args: ports (str|list[str]): input ports to be associated with the callback. `ports` can be a list of strings with the name of each port to be associated or a string if you want to associate the callback just to a single port. callback (func[...]): a callback function with the same number of arguments as elements in `ports`. Also the arguments will passed to `callback` in the same order of their corresponding ports in the`ports` list.
def get_my_callback(): def my_callback(data): # some code here return my_callback api.set_port_callback("inport1", get_my_callback()) api.set_port_callback("inport2", get_my_callback())
In this way, a new function (with different id) will be generated each time, but the callback behaviour will be the same.
api.remove_port_callback(callback)
Deregister the `callback` function. If it doesn't exist the method will exit quietly. Args: callback: Callback function to be removed.
Generators
counter = 0 def gen(): global counter for i in xrange(0, 3): api.send("output", counter) counter += 1 api.add_generator(gen) api.add_generator(gen)
This example produces values 0,1,2,3,4,5 on the output port "output".
Timers
api.add_timer(period, callback)
Multiple distinct periodic callbacks can be added. If an already added callback is added again, the old period will be replaced by the new `period`. Timers are not preemptive. Thus, the given interval provides only the lower bound of the interval at which the timer function is called. A zero `period` implies that the callback will be called as fast as possible. If you want two callbacks with identical behaviour to be run simultaneously then you will need to create two different functions with identical body or create or create a factory function that defines a inner function and returns it. Each time the factory function is called a new function (with different id) will be returned but with identical behaviour. See the TIP in the set_port_callback section for an example a factory function. Args: callback (func): Callback function to be called every `milliseconds`. period (str): Period between calls of `callback`. It is passed as a string. For example, "-1.3h3m2us" means minus (1.3 hours 3 minutes and 2 microseconds) which will be converted to: -(1.3*3600 + 3*60 + 2*1e-6) = -4860.000002 seconds. Available suffixes are: h, m, s, ms, us, ns. They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively. If the input uses multiple units then it needs to follow the same order presented in the previous sentence. That is, "2h3s" is allowed, but "3s2h" is not. Only the strings "0", "+0", "-0" are allowed to not have a unit of time suffix (they are optional in these cases). Signs are optional and are allowed just in the beginning of the string.
counter = 0 def t1(): global counter api.send("output", counter) counter += 1 api.add_timer("1s", t1)
This example produces value 0,1,2... on port "output" until graph shutdown.
api.remove_timer(callback)
Deregister timer callback. Args: callback (func): Callback function to be removed.
api.update_timer(period, callback)
Update the period of an existing `callback`. No error is thrown if the callback does not exist. Args: callback (func): Callback function for which the period will be updated. If `callback` is not registered nothing will be changed. period (str): String number representing a new period to overwrite the old value.
Shutdown Handlers
counter = 0 def on_input(data): global counter counter += 1 api.set_port_callback("input", on_input) def shutdown1(): print ("shutdown1: %d" % counter) def shutdown2(): print ("shutdown2: %d" % counter) api.add_shutdown_handler(shutdown1) api.add_shutdown_handler(shutdown2)counter = 0 def on_input(data): global counter counter += 1 api.set_port_callback("input", on_input) def shutdown1(): print "shutdown1: %d" % counter def shutdown2(): print "shutdown2: %d" % counter api.add_shutdown_handler(shutdown1) api.add_shutdown_handler(shutdown2)
This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port.
Configuration API
The API object provides the possibility to read configurations defined on the operator´s editor view or added in the configurations panel in the graph´s view. If you defined a config named "foo" you can access it by calling api.config.foo. Configuration field names cannot contain spaces. The "script" and "codelanguage" config fields won't appear in api.config as they are mandatory configuration parameters and should not be used by the end user.
Logging
To log messages use api.logger.info("some text"), api.logger.debug(""), api.logger.warn(""), or api.logger.error("").
Error Handling
You can raise any exception inside the script and the exception message will be logged and the operator will be terminated, except if the exception is thrown inside a thread different than the script's main thread. In this case, you need to call api.propagate_exception(e), where "e" is the exception object, to handle it correctly.
Do not write to stderr because that could have unintended consequences. Also, do not use api.logger.fatal or api.logger.critical, otherwise the graph will be stopped. Raise an exception or use api.propagate_exception if you want to stop the graph and log the error.
repo_root and subengine_root
You can access the repo_root and subengine_root path using api.repo_root and api.subengine_root, respectively.
Port Names
You can get the name of the input and output ports for the current operator instance by calling the methods api.get_inport_names() and api.get_outport_names(), respectively.
Message Type
You can access the Message type in the api object as api.Message. The message type can be constructed like so: api.Message(body, attributes), where body can be any object, and attributes should be a dictionary of str to object, or None. The body argument is mandatory while attributes is optional and defaults to None. The body and attributes of a message object msg can be accessed as msg.body and msg.attributes, respectively.
Correspondence between Data Pipeline Types and Python Types
Data Pipeline |
Python2 |
---|---|
string |
unicode |
blob |
str |
int64 |
int, long |
uint64 |
int, long |
float64 |
float |
byte |
int |
message |
api.Message |
FAQ
Where should I place initialization code for my operator?
# Hypothetical script for Python2Operator # In the operator's initialization we might want to setup a connection # with a database for example (`setup_connection` is an hypothetical function): db = setup_connection(api.config.host, api.config.port) def my_callback_func(data): global db db.write(data) # hypothetical call api.set_port_callback("input", my_callback_func)
Alternatively, one can also place initialization code in a generator callback that is registered with api.add_generator(func).