Modeling Guide

Golang Operator 2

The Go operator allows you to execute Go code within a graph. To execute, the pipeline engine must be running on the same $GOPATH that it was built.

Configuration Parameters

Parameter

Type

Description

codelanguage

string

The programming language used in the code.

Default: "go"

script

string

Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed.

Default: ""

Input

None

Output

None

Creating a GoExecutor Extension

  1. Create an extension of Go Executor operator and add the desired input and output ports.
  2. Add your Go code as an inline "script" parameter in the extension configuration or put the code into a separate file and use the prefix "file://" to specify the file name. The script must belong to the package main.
  3. There are two reserved functions for initialization and cleanup (runs during shutdown of operator). The signature for the initialization function is func Setup(), which it is called only once, when the operator starts. The shutdown function should have the following signature: func Cleanup(); it is called only once before the operator shuts down.
  4. All inputs are considered interface{} type, so for the one writing the script it is important to deal with correct type assertions. In order to use the message type, the value received through the input port should be converted to map[string]interface{} type, and the values accessed as keys "Attributes", "Encoding" and "Body" as in the following example:
    package main
    
    // here implement the function with the input port name, so data coming from this input will call this function
    func In(val interface{}){
    	message := val.(map[string]interface{})
    	body := message["Body"]
    	attr := message["Attributes"].(map[string]interface{})
    	encoding := message["Encoding"]
    
    	// ...
    }
  5. In order to call a function based on an input incoming data, a function named after the port name should be implemented. While the input function needs to be exported, i.e. start with a capital letter, the port name does not need to do so in order to match the function (e.g.: the function In(val interface{}) would match either port In or in). The function signatures always receive a single value as interface{}, and may or may not return an error, thus supporting both signatures: <Portname>(interface{}) and <Portname>(interface{}) error. As in the following example:
    package main
    
    import "errors"
    
    // here implement the function with the input port name, so data coming from this input will call this function
    // input port "in"
    func In(val interface{}) error {
    	strValue := val.(string)
    	return errors.New(strValue)
    }
  6. To output data, a variable with the output port name and of type func(interface{}) should be defined. It should then be called with the output data as its only parameter. While the output function needs to be exported, i.e. start with a capital letter, the port name does not need to do so in order to match the function (e.g.: the variable var Out(interface{}) would match either port Out or out). As in the following example:
    package main
    
    // output port "out"
    var Out func(interface{})
    
    // here implement the function with the input port name, so data coming from this input will call this function
    func In(val interface{}){
    	strValue := val.(string)
    	Out(strValue)
    }

Logging

To log messages during the Go operator execution, a function called Log must be defined and its parameter as string type. Also available is the formatted version Logf, with two parameters: string and ...interface{}. All logs will be added as Debug logs on Data Pipeline's tracer. For example:
package main
var Log func(string)
var Logf func(string, ...interface{})

func Setup() {
	Log("starting go operator")
}

func In(val interface{}){
	strValue := val.(string)
	strValue += "test"
	Logf("input:%s", strValue)
	// ...
}
The error log is also available. It writes errors to operator's tracer and should be written with same formats of Log but called Error or Errorf, please note that the error does not stop the graph, only writes the error to the tracer.

Configuration

Configuration calls provide the ability to read the configuration provided via a graph description. The function parameter should be the property title.

For example:
package main

var GetString func(string) string
var GetInt func(string) int
var GetBool func(string) bool
var GetObject func(string) map[string]interface{}
var AsObject func(string) map[string]interface{}

func In(val interface{}){
	strValue := GetString("str")
	intValue := GetInt("int")
 	boolValue := GetBool("bool")
	objValue := GetObject("obj")
	entireConfig := AsObject()
	// ...
}

Message Response

SendResponse sends either message response or error as the response to message request and can be called from golang script.

For example:
package main

var SendResponse func(map[string]interface{}, map[string]interface{}, error)

func Input(val interface{}){
	message := val.(map[string]interface{})
	body := message["Body"]
	attr := message["Attributes"].(map[string]interface{})
	encoding := message["Encoding"]

	SendResponse(message, message, nil)
	// ...
}

Tick Timer

To create a tick function that is called every time after an interval x, the global variable Timer must be set with x, where x is an int64 value (time in milliseconds) or a time.Duration (literal time - no conversions).

The respective called tick function must be named Tick(). The signature Tick() error is also supported, if an error must be returned.

For example:
package main
var Timer int64 = 500 // or 500 * time.Millisecond

func Tick() {
	// function to be called every 500 ms
	// changes here
}