Golang Operator 2
The Go operator allows you to execute Go code within a graph. To execute, the pipeline engine must be running on the same $GOPATH that it was built.
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
codelanguage |
string |
The programming language used in the code.
Default: "go" |
script |
string |
Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed. Default: "" |
Input
None
Output
None
Creating a GoExecutor Extension
- Create an extension of Go Executor operator and add the desired input and output ports.
- Add your Go code as an inline "script" parameter in the extension configuration or put the code into a separate file and use the prefix "file://" to specify the file name. The script must belong to the package main.
- There are two reserved functions for initialization and cleanup (runs during shutdown of operator). The signature for the initialization function is func Setup(), which it is called only once, when the operator starts. The shutdown function should have the following signature: func Cleanup(); it is called only once before the operator shuts down.
- All inputs are considered interface{} type, so for the one writing the
script it is important to deal with correct type assertions. In order to use
the message type, the value received through the input port should be
converted to map[string]interface{} type, and the values
accessed as keys "Attributes", "Encoding"
and "Body" as in the following example:
package main // here implement the function with the input port name, so data coming from this input will call this function func In(val interface{}){ message := val.(map[string]interface{}) body := message["Body"] attr := message["Attributes"].(map[string]interface{}) encoding := message["Encoding"] // ... }
- In order to call a function based on an input incoming data, a function named after the
port name should be implemented. While the input function needs to be
exported, i.e. start with a capital letter, the port name does not need to
do so in order to match the function (e.g.: the function In(val
interface{}) would match either port In or
in). The function signatures always receive a single
value as interface{}, and may or may not return an error,
thus supporting both signatures: <Portname>(interface{})
and <Portname>(interface{}) error. As in the following
example:
package main import "errors" // here implement the function with the input port name, so data coming from this input will call this function // input port "in" func In(val interface{}) error { strValue := val.(string) return errors.New(strValue) }
- To output data, a variable with the output port name and of type
func(interface{}) should be defined. It should then be
called with the output data as its only parameter. While the output function
needs to be exported, i.e. start with a capital letter, the port name does
not need to do so in order to match the function (e.g.: the variable
var Out(interface{}) would match either port
Out or out). As in the following
example:
package main // output port "out" var Out func(interface{}) // here implement the function with the input port name, so data coming from this input will call this function func In(val interface{}){ strValue := val.(string) Out(strValue) }
Logging
package main var Log func(string) var Logf func(string, ...interface{}) func Setup() { Log("starting go operator") } func In(val interface{}){ strValue := val.(string) strValue += "test" Logf("input:%s", strValue) // ... }
Configuration
Configuration calls provide the ability to read the configuration provided via a graph description. The function parameter should be the property title.
package main var GetString func(string) string var GetInt func(string) int var GetBool func(string) bool var GetObject func(string) map[string]interface{} var AsObject func(string) map[string]interface{} func In(val interface{}){ strValue := GetString("str") intValue := GetInt("int") boolValue := GetBool("bool") objValue := GetObject("obj") entireConfig := AsObject() // ... }
Message Response
SendResponse sends either message response or error as the response to message request and can be called from golang script.
package main var SendResponse func(map[string]interface{}, map[string]interface{}, error) func Input(val interface{}){ message := val.(map[string]interface{}) body := message["Body"] attr := message["Attributes"].(map[string]interface{}) encoding := message["Encoding"] SendResponse(message, message, nil) // ... }
Tick Timer
To create a tick function that is called every time after an interval x, the global variable Timer must be set with x, where x is an int64 value (time in milliseconds) or a time.Duration (literal time - no conversions).
The respective called tick function must be named Tick(). The signature Tick() error is also supported, if an error must be returned.
package main var Timer int64 = 500 // or 500 * time.Millisecond func Tick() { // function to be called every 500 ms // changes here }