Python3 Operator
The Python3Operator user can define a script which offers some convenience functions offered by the API object. For example, you can set a callback that is called when you receive new data in the "datain" port by writing api.set_port_callback("datain", callback_name). Other conveniences provided by API will be described in the remaining sections of this document.
An operator extending the Python3Operator can be created directly in the UI in the repository tab
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
script |
string |
Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed. |
Input
None
Output
None
Basic Examples
Count all incoming messages on port named "input" and write the count to the port named "output".
counter = 0 def on_input(data): global counter counter += 1 api.send("output", counter) api.set_port_callback("input", on_input)
api.set_port_callback can also be used to wait for two input ports to have received data before calling the handler:
def on_input(data1, data2): api.send("output", data1 + data2) api.set_port_callback(["input1", "input2"], on_input)
Generators
counter = 0 def gen(): global counter for i in xrange(0, 3): api.send("output", counter) counter += 1 api.add_generator(gen) api.add_generator(gen)
This example produces values 0,1,2,3,4,5 on the output port "output".
Timers
Timers call the handler at given intervals. Timers are not preemptive. Thus, the given interval provides only the lower bound of the interval at which the timer function is called. All timer handlers are stopped prior to the execution of shutdown handlers. A zero or negative time implies that the callback will be called as fast as possible.
The time is passed as a string. For example, "-1.3h3m2us" means minus (1.3 hours 3 minutes and 2 microseconds) which will be converted to: -(1.33600 + 360 + 2*1e-6) = -4860.000002 seconds.
counter = 0 def t1(): global counter api.send("output", counter) counter += 1 api.add_timer("1s", t1)
This example produces value 0,1,2... on port "output" until graph shutdown.
Shutdown Handlers
counter = 0 def on_input(data): global counter counter += 1 api.set_port_callback("input", on_input) def shutdown1(): print "shutdown1: %d" % counter def shutdown2(): print "shutdown2: %d" % counter api.add_shutdown_handler(shutdown1) api.add_shutdown_handler(shutdown2)counter = 0 def on_input(data): global counter counter += 1 api.set_port_callback("input", on_input) def shutdown1(): print "shutdown1: %d" % counter def shutdown2(): print "shutdown2: %d" % counter api.add_shutdown_handler(shutdown1) api.add_shutdown_handler(shutdown2)
This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port.
Configuration API
The API object provides the possibility to read configurations defined on the operator´s editor view or added in the configurations panel in the graph´s view. If you defined a config named "foo" you can access it by calling api.config.foo. Configuration field names cannot contain spaces. The "script" and "codelanguage" config fields won't appear in api.config as they are mandatory configuration parameters and should not be used by the end user.
Logging
To log messages use api.logger.info("some text"), api.logger.debug(""), api.logger.warn(""), or api.logger.error("").
Error Handling
You can raise any exception inside the script and the exception message will be logged and the operator will be terminated, except if the exception is thrown inside a thread different than the script's main thread. In this case, you need to call api.propagate_exception(e), where "e" is the exception object, to handle it correctly.
Do not write to stderr because that could have unintended consequences. Also, do not use api.logger.fatal or api.logger.critical, otherwise the graph will be stopped. Raise an exception or use api.propagate_exception if you want to stop the graph and log the error.
repo_root and subengine_root
You can access the repo_root and subengine_root path using api.repo_root and api.subengine_root, respectively.
Port Names
You can get the name of the input and output ports for the current operator instance by calling the methods api.get_inport_names() and api.get_outport_names(), respectively.
Message Type
You can access the Message type in the api object as api.Message. The message type can be constructed like so: api.Message(body, attributes), where body can be any object, and attributes should be a dictionary of str to object, or None. The body argument is mandatory while attributes is optional and defaults to None. The body and attributes of a message object msg can be accessed as msg.body and msg.attributes, respectively.
Correspondence between Data Pipeline Types and Python Types
Data Pipeline |
Python3 |
---|---|
string |
str |
blob |
bytes |
int64 |
int |
uint64 |
int |
float64 |
float |
byte |
int |
message |
api.Message |
FAQ
Where should I place initialization code for my operator?
# Hypothetical script for Python3Operator # In the operator's initialization we might want to setup a connection # with a database for example (`setup_connection` is an hypothetical function): db = setup_connection(api.config.host, api.config.port) def my_callback_func(data): global db db.write(data) # hypothetical call api.set_port_callback("input", my_callback_func)
Alternatively, one can also place initialization code in a generator callback that is registered with api.add_generator(func).