Golang Operator 2
The Go operator allows you to execute Go code within a graph. To execute, the pipeline engine must be running on the same $GOPATH that it was built.
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
codelanguage |
string |
The programming language used in the code.
Default: "go" |
script |
string |
Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed. Default: "" |
Input
None
Output
None
Creating a GoExecutor Extension
- Create an extension of Go Executor operator and add the desired input and output ports.
- Add your Go code as an inline "script" parameter in the extension configuration or put the code into a separate file and use the prefix "file://" to specify the file name. The script must belong to the package main.
- There are two reserved functions for initialization and cleanup (runs during shutdown of operator). The signature for the initialization function is func Setup(), which it is called only once, when the operator starts. The shutdown function should have the following signature: func Cleanup(); it is called only once before the operator shuts down.
- All inputs are considered interface{} type, so for the one writing the
script it is important to deal with correct type assertions. In order to use
the message type, the value received through the input port should be
converted to map[string]interface{} type, and the values
accessed as keys "Attributes", "Encoding"
and "Body" as in the following example:
package main // here implement the function with the input port name, so data coming from this input will call this function func In(val interface{}){ message := val.(map[string]interface{}) body := message["Body"] attr := message["Attributes"].(map[string]interface{}) encoding := message["Encoding"] // ... }
Dealing with message type may need more type assertions as Body, Attributes and Encoding would be typed as interface{}. In this case, a type assertion message["Body"].(string) would be needed to deal with a string body. Attributes need type assertion to map[string]interface{}. Encoding should be treated as a string.
- In order to call a function based on an input incoming data, a function named after the
port name should be implemented. While the input function needs to be
exported, i.e. start with a capital letter, the port name does not need to
do so in order to match the function (e.g.: the function In(val
interface{}) would match either port In or
in). The function signatures always receive a single
value as interface{}, and may or may not return an error,
thus supporting both signatures: <Portname>(interface{})
and <Portname>(interface{}) error. As in the following
example:
package main import "errors" // here implement the function with the input port name, so data coming from this input will call this function // input port "in" func In(val interface{}) error { strValue := val.(string) return errors.New(strValue) }
- To output data, a variable with the output port name and of type
func(interface{}) should be defined. It should then be
called with the output data as its only parameter. While the output function
needs to be exported, i.e. start with a capital letter, the port name does
not need to do so in order to match the function (e.g.: the variable
var Out(interface{}) would match either port
Out or out). As in the following
example:
package main // output port "out" var Out func(interface{}) // here implement the function with the input port name, so data coming from this input will call this function func In(val interface{}){ strValue := val.(string) Out(strValue) }
Logging
package main var Log func(string) var Logf func(string, ...interface{}) func Setup() { Log("starting go operator") } func In(val interface{}){ strValue := val.(string) strValue += "test" Logf("input:%s", strValue) // ... }
Configuration
Configuration calls provide the ability to read the configuration provided via a graph description. The function parameter should be the property title.
package main var GetString func(string) string var GetInt func(string) int var GetBool func(string) bool var GetObject func(string) map[string]interface{} var AsObject func(string) map[string]interface{} func In(val interface{}){ strValue := GetString("str") intValue := GetInt("int") boolValue := GetBool("bool") objValue := GetObject("obj") entireConfig := AsObject() // ... }
Message Response
SendResponse sends either message response or error as the response to message request and can be called from golang script.
package main var SendResponse func(map[string]interface{}, map[string]interface{}, error) func Input(val interface{}){ message := val.(map[string]interface{}) body := message["Body"] attr := message["Attributes"].(map[string]interface{}) encoding := message["Encoding"] SendResponse(message, message, nil) // ... }
Tick Timer
The tick function that is called periodically at an interval. A global variable Timer should contain this interval as an int64 value (time in milliseconds) or a time.Duration (literal time - no conversions). Value of Timer is set globally and is static, i.e cannot be changed during graph execution.
The respective called tick function must be named Tick(). The signature Tick() error is also supported, if an error must be returned. It runs concurrently with ports handlers, so it is not interrupted..
package main var Timer int64 = 500 // or 500 * time.Millisecond, // var Timer time.Duration = 500 * time.Millisecond func Tick() { // function to be called every 500 ms // changes here }