Modeling Guide for SAP Data Hub

R Client

The R Client operator runs R code defined by the user in the RServer. The user can specify the code they want to run by writing an R script and adding callbacks by using the predefined api object.

For example, you can set a callback function myFunc that is called when you receive new data in the port "input1", and then send data on port "output2" by writing api$setPortCallback(c("input1"), c("output2"), "myFunc").

The functionalities offered by the api object are:
  • api$setPortCallback(inports, outports, functionName)

  • api$addTimer(period, outports, functionName)

  • api$setSwitchCallback(inports, outports, functionName)

  • api$addShutdownHandler(functionName)

The user´s script will run during the initialization of the R Client operator. For code that should run after the initialization, the user can add and define callbacks in the script, by using methods from the API object such as: setPortCallback, addTimer, setSwitchCallback, and addShutdownHandler.

Configuration Parameters

Parameter

Type

Description

Script string Mandatory. Inline script to be executed, or if the entered text starts with "file://" then it will be interpreted as the name of the script file to be executed.

Switch Start Off

boolean

In case a switch callback exists, then the other callbacks will be disabled from the graph's start if this field is true.

Default: true

Switch Affect Timer Callbacks

boolean

In case a switch callback exists, then the timer callback will be affected by the state of the switch if this field is true (the port callbacks are always affected).

Default: true

Debug Mode

boolean

Set to true to get better error messages, but the performance may be affected when enabled.

Default: false

Configuration Type string Selects which connection configuration method will be used. If "Manual" is selected then the credentials will be retrieved from these four configuration fields: Host, Port, User, and Password. The only other configuration option is "Configuration Manager" which gets the credentials from the Connection Management service.

Default: "Manual"

Host string Address of a running RServe. If left empty, then a Rserve will be launched automatically and a port will be found automatically.

This field is only visible when Configuration Type="Manual".

Default: ""

Port int Port of the RServe. If Host was left empty, then this field will be ignored.

This field is only visible when Configuration Type="Manual".

Default: 0

User string Username for authenticating in Rserve if required. If Host field is empty then the supplied username will be ignored.

This field is only visible when Configuration Type="Manual".

Default: ""

Password string Password for authenticating in Rserve if required. If Host field is empty, then the supplied password will be ignored.

This field is only visible when Configuration Type="Manual".

Default: ""

Connection ID string The ID of the connection information to retrieve from the Connection Management service. The connection information needs to be of type RSERVE. If the Host field in your connection information is left empty, then a new RServe will be launched automatically and the rest of the credentials will be ignored.

Default: ""

Input

None

Output

None

Allowed Types and (Data Pipeline)/R Type Equivalences

Allowed Types in In/Out Ports

  • string

  • blob ([]byte)

  • float64

  • int64

  • uint64

  • arrays of the types above (e.g.: []string, [][]float64, etc.)
  • byte
  • message
  • any

(Data Hub Modeler)/R Type Equivalences

Data Hub Modeler)

R

string

character

blob

raw

float64

double (64 bits)

int64

integer (signed 32 bits)

uint64

integer (signed 32 bits)

slice or array

vector

message*

list (Body, Attributes, Encoding)

message**

data frame

* message with field encoding different than 'table'. Example of message creation in R: m <- list(Body=11, Attributes=list(field1="foo"), Encoding="bar").

** message with encoding equal to 'table' and a slice of columns in the body.

API Object Methods

api$setPortCallback(inports, outports, functionName):

This method registers a callback named functionName that will be called when all ports in inports have data waiting to be read. The outports argument specify the name of zero or more outports to which the returned values from functionName should be sent. The values returned by the function are expected to be returned with the list data type in R. So the last line of the function should contain something like: list(output1=value1, output2=value2), where output1 and output2 are the outport names that were also specified in the outports argument of the method api$setPortCallback. If no outport was specified in api$setPortCallback then the last line of the callback function can be anything because its return value will be ignored anyway.

If you specify outports for the callback, but in some calls you want to skip sending to the outport, then you can return NULL as the function return. For example, this is useful when you accumulate some data in a global variable for some calls and then in a later callback you want to send the accumulated information. So in the first call you return NULL and in the later call you return list(output=accumulated). In this manner, nothing is sent to the outport in the first call, only in the later one.

The script above has three callbacks: "on_input12add", "on_input12mult", and "on_input3exp". Both "on_input12add", "on_input12mult" functions wait for data to be available in the port group ("input1", "input2"). When the data becomes available, the data from "input1" is passed to the first argument of the callbacks and the data from "input2" is passed to the second argument of the callback. When more than one callback is associated with a port group (as in this case), the callbacks are executed sequentially in the order that the callbacks were added in the script.

The callback "on_input3pow" is associated with a port group that is disjointed from the previous one. In this case, the port group consists of just one input port "input3". Notice that the callback should be the name of a function variable which is in scope. Inport names should coincide with the name of an existing inport, and the number of ports in a port group need to equal the number of arguments of the callback associated with this group. It is important to notice that all port groups needs to be disjointed from each other. For example, you cannot add a callback associated with ports ("in1", "in2") and another associated with port ("in1") or ports ("in2", "in3"). However, it is allowed to add more than one callback to the same port group. The union of all port groups does not need to cover all existing input ports of the operator. That means that some input ports can be left unused.

The function on_input12add will add acc (a global variable) with data1 and data2, and assign the result to acc again. Since the line api$setPortCallback(c("input1", "input2"), c(), "on_input12add") didn't specify output ports, the return value of on_input12add will be ignored. This callback's objective is just to update the acc global variable.

On the other hand, the returned value of on_input12mult is not ignored. Instead, the value on the returned list with key equal output1 will be fed into the output port of the same name. This intent was signaled by specifying "output1" in the list of outports in the setPortCallback method. Notice that when we specify that, we always need to return an R list with a key for each output name in the last line of the callback.

api$addTimer(period, outports, functionName):

This method registers a callback named functionName which will be repeatedly called on every period units of time. The period is passed as a string. For example, "1.3h3m2us" means (1.3 hours 3 minutes and 2 microseconds).

Available suffixes are: h, m, s, ms, us, ns. They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively. Only the strings "0", "+0", "-0" are allowed to NOT have a unit of time suffix (they are optional in these cases). Signs are optional and are allowed just in the beginning of the string.

The outports argument specify the name of zero or more outports to which the returned values from functionName should be sent. The values returned by the function are expected to be returned with the list data type in R. So the last line of the function should contain something like: list(output1=value1, output2=value2), where output1 and output2 are the outport names that were also specified in the outports argument of the method api$addTimer. If no outport was specified in api$addTimer then the last line of the callback function can be anything because its return value will be ignored anyway.

If you specify outports for the callback, but in some calls you want to skip sending to the outport, then you can return NULL as the function return. For example, this is useful when you accumulate some data in a global variable for some calls and then in a later callback you want to send the accumulated information. So in the first call you return NULL and in the later call you return list(output=accumulated). In this manner, nothing is sent to the outport in the first call, only in the later one.

This example produces value 1,2,3... on every second on outport "output" until the graph shutdown.

api$setSwitchCallback(inports, outports, functionName):

This method adds a callback that is able to control the switch variable. When the switch is 'on' all other callbacks become enabled. When the switch is 'off' all other callbacks become disabled and no data will be consumed in their ports.

The callback functionName added with setSwitchCallback will always need to return a boolean indicating what value the switch should assume. For example, suppose that you added a switch callback as: api$setSwitchCallback(inports=c("inputX", "inputY"), outports=c("outputA", "outputB"), "foo") Then the function foo should look like this:
foo <- function(dataX, dataY) {
    out1 <- dataX + dataY
    out2 <- dataX * dataY
    if (dataX %% dataY == 0) {
        position <- FALSE
    } else {
        position <- TRUE
    }
    list(switchPosition=position, outputA=out1, outputB=out2)
}

So the last line of switch callback always needs to have the keyword switchPosition in the returned list. Notice that the boolean returned will not be sent in any outport unless you include a port called 'switchPosition' of type byte in the operator and also specify this port in the outports parameter of the api$setSwitchCallback method. In case you specify, then the boolean will be converted to a byte and send in the 'switchPosition' port, in addition to being used internally to set the value of the switch.

There are two operator's configuration parameters related to the switch callback: switchStartOff and switchAffectTimerCallbacks. If switchStartOff is set to true, then the switch will start in the off position when the graph is started. The switchAffectTimerCallbacks configuration indicates whether the switch should affect (block/unblock) the timer callbacks too. On the other hand, the port callbacks are always affected, and the shutdown, and switch callbacks are never affected by the switch position. That is the shutdown, and switch callbacks are always enabled (unblocked).

Differently than api$setPortCallback, api$setTimer, and api.addShutdownHandler, api$setSwitchCallback can set only one callback. If you use the method more than once, the first callback will be replaced by the new one.

api$addShutdownHandler(functionName):

This method will register the callback functionName to be called upon the graph's shutdown. If more than one callback is added they will be called sequentially in the order that they were added in the script.

This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port before the graph stops.

Go/R Type Equivalences and Conversions

This section is useful if you are connecting R Client with a script Go operator.

Message Type

The Go message type can be represented in R either as a generic list object or as a data frame. A non-table message in Go will be represented as a list in R with the following fields: Body, Attributes, Encoding. The attributes should be a json like object. Encoding should be a string. A table message in Go has to follow a certain pattern. In the encoding field it needs to have the string "table". Also its body needs to be a []interface{} (slice of empty interfaces), where each element stores a column of the table. Each column should be a slice of numeric type, string, or boolean. All columns should have the same number of elements.

The attributes object should contain a field named "fieldNames" with the names of each column of the data frame. Optionally, it can also contain a field attributes["rlang"]["firstColumnIsRowNames"] indicating with a boolean whether the first column of the table should be considered as rownames in R. If the encoding field of the message is different than "table", then the equivalent type in R will be a generic list with the fields Body, Attributes, and Encoding.

Go to R Conversions Examples:

  • "abcd" -> "abcd"

  • []string{"al", "alkj"} -> c("al", "alkj")

  • [][]string{{"al", "alkj"}, {"al2", "alkj2"}} -> list(c("al", "alkj"), c("al2", "alkj2"))

  • int64(1) -> 1L

  • 1.0 -> 1.0

  • [][]float64{{1, 2}, {3,4}} -> list(c(1, 2), c(3, 4))

  • []byte{1, 2, 3} -> as.raw(c(1,2,3))

  • map[string]interface{}{"Body": 3, "Attributes": nil, "Encoding": "foo"} -> list(Body=3, Attributes=list(), Encoding="foo")

  • nil -> NA

  • The go message below:
    map[string]interface{}{
        "Body": []interface{}{
            []string{"Mazda RX4", "Mazda RX4 Wag"},
            []float64{21, 21},
            []float64{6, 6},
        },
        "Attributes": map[string]interface{}{
            "rlang":      map[string]interface{}{"firstColumnIsRowNames": true},
            "fieldNames": []string{"rowNames", "mpg", "cyl"},
        },
        "Encoding": "table",
    }
    is converted to data.frame(row.names = c("Mazda RX4", "Mazda RX4 Wag"), mpg=c(21, 21), cyl=c(6, 6))

R to Go Conversions

This section is divided in two subsections. The first shows how conversions are done when the target Go type is known. The second covers the case when the target Go type is not known (when output port is of type any or when data is inside the body of a message).

Target Go Type is Known

In this context the output port of the R Client indicates the target Go type for the R object be converted to.

Examples R -> Go (Known Target Type):
  • outport type uint64:

    • 1L -> uint64(1)

    • 2.7 -> uint64(2)

  • outport type int64:

    • -1L -> int64(-1)

    • -2.7 -> int64(-2)

  • outport type float64:

    • 1L -> float64(1.0)

    • 2.7 -> float64(2.7)

  • outport type []uint64:

    • 1.3 -> []uint64{1}

    • c(1.3) -> []uint64{1}

    • c(1.3, 2.7) -> []uint64{1, 2}

Target Go Type is Unknown

The target Go type might not be known when the port type remains any even after the port types are propagated or when the data is inside the body of a message. In those cases all numeric and vector of numeric types in R will be converted to float64 and slice of float64 in Go, respectively. Singletons vectors in R (c(1)) will be converted to scalars in Go (1.0). On the other hand, singleton lists (list(1)) will be converted to a singleton slice in Go ([]float64{1.0}).

Examples R -> Go (Unknown Target Type):
  • 1L -> float64(1)

  • 2.7 -> float64(2.7)

  • c(1.3) -> float64(1.3)

  • list(1) -> []float64{1}

  • c(1.3, 2.7) -> []float64{1.3, 2.7}

  • list(1.3, 3L) -> []float64{1.3, 3.0}

  • list(Body=3, Attributes=list()) -> map\[string\]interface{}{"Body": 3.0, "Attributes": nil}

  • list(c(1,2,3), c(4,5,6)) -> [][]float64{{1, 2, 3}, {4, 5, 6}}

  • list(c(1,2,3), c("alkj")) -> []interface{}{[]float64{1, 2, 3}, "alkj"}

  • c("one","two","three") -> []string{"one", "two", "three"}

  • list("one","two","three") -> []string{"one", "two", "three"}

  • NULL -> nil

  • NA -> nil

  • data.frame(row.names = c("Mazda RX4", "Mazda RX4 Wag"), mpg=c(21, 21), cyl=c(6, 6)) ->
    map[string]interface{}{
        "Body": []interface{}{
            []string{"Mazda RX4", "Mazda RX4 Wag"},
            []float64{21, 21},
            []float64{6, 6},
        },
        "Attributes": map[string]interface{}{
            "rlang":      map[string]interface{}{"firstColumnIsRowNames": true},
            "fieldNames": []string{"rowNames", "mpg", "cyl"},
        },
        "Encoding": "table",
    }
  • data.frame(col1=c(TRUE, FALSE), col2=c(1,2)) ->
    map[string]interface{}{
        "Body": []interface{}{
            []bool{true, false},
            []float64{1, 2},
        },
        "Attributes": map[string]interface{}{
            "rlang":      map[string]interface{}{"firstColumnIsRowNames": false},
            "fieldNames": []string{"col1", "col2"},
        },
        "Encoding": "table",
    }
  • list(1, "lakj", c(2, 3, 4), c("alkj", "qoiu")) -> []interface{}{1.0, "lakj", []float64{2, 3, 4}, []string{"alkj", "qoiu"}}

  • list(list(1L, "abc", 2.0), list(), list("def", 3)) -> [][]interface{}{{1.0, "abc", 2.0}, {}, {"def", 3.0}}

  • list(c(1), c(1, 2), c(3, 4)) -> []interface{}{1.0, []float64{1, 2}, []float64{3, 4}}

Note that in the last example above that c(1) is converted to 1.0 and not to []float64{1.0}. This is because singleton vectors in R are converted to scalars in Go.

Docker Tags

The default R Client operator already comes with three tags: "rserve", "rjsonlite", and "rmsgpack". Those are needed to select the right docker image for the container where the operator's group will run. Those tags are only relevant when the user leaves the Host configuration field empty and thus causing a new RServe process to be launched inside the operator's group container. If you are going to connect to an already existing RServe by specifying the Host configuration then you don't need those tags. You can remove those tags by creating a new operator which extends the R Client operator.

Removing the tags is not mandatory when connecting to an external RServe, but it is recommended since it will lower the requirements for the docker image selected for the operator's group. If your external RServe doesn't already have the jsonlite and msgpack packages installed then they will be installed in the first time you run the R Client operator connected to that Host. The graph may take a while to start due to the installation time of those packages in your external RServe.

Limitations

Adding new callbacks during other callback executions will have no effect. That is, it is not possible to add new callbacks after the operator's initialization.