Modeling Guide

Goengine

This Go operator increments and outputs a counter value with the input.

Configuration Parameters

Parameter

Type

Description

codelanguage

string

The programming language used in the code.

Default: "go"

script

string

Mandatory. The inline script to be executed. If the script starts with "file://", then the script file is executed.

Default: ""

Input

Input

Type

Description

input

string

Any string that should be output with the counter.

Output

Output

Type

Description

output

string

The string appended to counter value.

Creating a GoExecutor Extension

  1. Create an extension of Go Executor operator and add the desired input and output ports.
  2. Add your Go code as an inline "script" parameter in the extension configuration or put the code into a separate file and use the prefix "file://" to specify the file name. The script must belong to package main.
  3. There are two reserved functions for initialization and cleanup (runs during shutdown of operator). The signature for the initialization function is func Setup(); this function may implement some setup code, it is called only once, when the operator starts. And the shutdown function should have the following signature: func Cleanup(); it is called only once before operator shuts down.
  4. All inputs are considered interface{} type, so for the one writing the script is important to deal with correct type assertions. In order to use message type, the value received through the input port should be converted to type map[string]interface{}, then the values can be accessed as "Attributes", "Encoding" and "Body" as in the following example:
    package main
    
    //here implement the function with the input port name, so data coming from this input will call this function
    func In(val interface{}){
    	message := val.(map[string]interface{})
    	body := message["Body"]
    	attr := message["Attributes"].(map[string]interface{})
    	encoding := message["Encoding"]
    
    	...
    }
  5. In order to call a function based on an input incoming data, the function name should be the same as the port name, and data from that port will be sent through its parameter. And to output data, a function with the outport name should be defined and called with data as parameter. As in the following example:
    package main
    var Out func(interface{})
    //here implement the function with the input port name, so data coming from this input will call this function
    func In(val interface{}){
    	strValue := val.(string)
    	strValue += "test"
    
    	Out(strValue)
    	...
    }

Logging

To log messages during the Go operator execution a function called Log must be defined and its parameter is string type. Also is available the formatted version Logf, with two parameters: string, ...interface{}. All logs will be added as Debug logs on Data Pipeline's tracer. Example:
package main
var Log func(string)
var Logf func(string, ...interface{})

func Setup() {
	Log("starting go operator")
}

func In(val interface{}){
	strValue := val.(string)
	strValue += "test"
	Logf("input:%s", strValue)
	...
}

The error log is also available, it writes errors to operator's tracer and should be written with same formats of Log but called Error or Errorf. Note that the error does not stop the graph, only writes the error to the tracer.

Configuration

Configuration calls provide the ability to read the configuration provided via a graph description. The function parameter should be the property title.

For example:
package main

var GetString func(string) string
var GetInt func(string) int
var GetBool func(string) bool
var GetObject func(string) map[string]interface{}
var AsObject func(string) map[string]interface{}

func In(val interface{}){
	strValue := GetString("str")
	intValue := GetInt("int")
 	boolValue := GetBool("bool")
	objValue := GetObject("obj")
	entireConfig := AsObject()
	...
}

Message Response

SendResponse sends either a message response or error as the response to message request and can be called from golang script.

For example:
package main

var SendResponse func(map[string]interface{}, map[string]interface{}, error)

func Input(val interface{}){
	message := val.(map[string]interface{})
	body := message["Body"]
	attr := message["Attributes"].(map[string]interface{})
	encoding := message["Encoding"]

	SendResponse(message, message, nil)
	...
}

Tick Timer

To create a tick function, where it will be called every x amount of ms, the script of the global variable Timer must be set, containing an integer value (time in ms). Then, the function named "Tick()" will be called after the time in a loop.

For example:
package main
var Timer int64 = 500

func Tick() {
	//function to be called every 500 ms
	//changes here
}