R Client
The R Client operator runs R code defined by the user in the RServer. The user can specify the code they want to run by writing an R script and adding callbacks by using the predefined api object.
For example, you can set a callback function myFunc that is called when you receive new data in the port "input1", and then send data on port "output2" by writing api$setPortCallback(c("input1"), c("output2"), "myFunc").
-
api$setPortCallback(inports, outports, functionName)
-
api$addTimer(period, outports, functionName)
-
api$setSwitchCallback(inports, outports, functionName)
-
api$addShutdownHandler(functionName)
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
Script | string | Mandatory. Inline script to be executed, or if the entered text starts with "file://" then it will be interpreted as the name of the script file to be executed. |
Switch Start Off |
boolean |
In case a switch callback exists, then the other callbacks will be disabled from the graph's start if this field is true. Default: true |
Switch Affect Timer Callbacks |
boolean |
In case a switch callback exists, then the timer callback will be affected by the state of the switch if this field is true (the port callbacks are always affected). Default: true |
Debug Mode |
boolean |
Set to true to get better error messages, but the performance may be affected when enabled. Default: false |
Configuration Type | string | Selects which connection configuration method will be used.
If "Manual" is selected then the credentials will be retrieved
from these four configuration fields: Host,
Port, User, and
Password. The only other configuration
option is "Configuration Manager" which gets the credentials
from the Connection Management service.
Default: "Manual" |
Host | string | Address of a running RServe. If left empty, then a Rserve
will be launched automatically and a port will be found
automatically.
This field is only visible when Configuration Type="Manual". Default: "" |
Port | int | Port of the RServe. If Host was left empty,
then this field will be ignored.
This field is only visible when Configuration Type="Manual". Default: 0 |
User | string | Username for authenticating in Rserve if required. If
Host field is empty then the supplied
username will be ignored.
This field is only visible when Configuration Type="Manual". Default: "" |
Password | string | Password for authenticating in Rserve if required. If
Host field is empty, then the supplied
password will be ignored.
This field is only visible when Configuration Type="Manual". Default: "" |
Connection ID | string | The ID of the connection information to retrieve from the
Connection Management service. The connection information needs
to be of type RSERVE. If the Host field in your connection
information is left empty, then a new RServe will be launched
automatically and the rest of the credentials will be
ignored.
Default: "" |
Input
None
Output
None
Allowed Types and (Data Pipeline)/R Type Equivalences
Allowed Types in In/Out Ports
-
string
-
blob ([]byte)
-
float64
-
int64
-
uint64
- arrays of the types above (e.g.: []string, [][]float64, etc.)
- byte
- message
-
any
(Data Hub Modeler)/R Type Equivalences
Data Hub Modeler) |
R |
---|---|
string |
character |
blob |
raw |
float64 |
double (64 bits) |
int64 |
integer (signed 32 bits) |
uint64 |
integer (signed 32 bits) |
slice or array |
vector |
message* |
list (Body, Attributes, Encoding) |
message** |
data frame |
* message with field encoding different than 'table'. Example of message creation in R: m <- list(Body=11, Attributes=list(field1="foo"), Encoding="bar").
** message with encoding equal to 'table' and a slice of columns in the body.
API Object Methods
api$setPortCallback(inports, outports, functionName):
This method registers a callback named functionName that will be called when all ports in inports have data waiting to be read. The outports argument specify the name of zero or more outports to which the returned values from functionName should be sent. The values returned by the function are expected to be returned with the list data type in R. So the last line of the function should contain something like: list(output1=value1, output2=value2), where output1 and output2 are the outport names that were also specified in the outports argument of the method api$setPortCallback. If no outport was specified in api$setPortCallback then the last line of the callback function can be anything because its return value will be ignored anyway.
The script above has three callbacks: "on_input12add", "on_input12mult", and "on_input3exp". Both "on_input12add", "on_input12mult" functions wait for data to be available in the port group ("input1", "input2"). When the data becomes available, the data from "input1" is passed to the first argument of the callbacks and the data from "input2" is passed to the second argument of the callback. When more than one callback is associated with a port group (as in this case), the callbacks are executed sequentially in the order that the callbacks were added in the script.
The callback "on_input3pow" is associated with a port group that is disjointed from the previous one. In this case, the port group consists of just one input port "input3". Notice that the callback should be the name of a function variable which is in scope. Inport names should coincide with the name of an existing inport, and the number of ports in a port group need to equal the number of arguments of the callback associated with this group. It is important to notice that all port groups needs to be disjointed from each other. For example, you cannot add a callback associated with ports ("in1", "in2") and another associated with port ("in1") or ports ("in2", "in3"). However, it is allowed to add more than one callback to the same port group. The union of all port groups does not need to cover all existing input ports of the operator. That means that some input ports can be left unused.
The function on_input12add will add acc (a global variable) with data1 and data2, and assign the result to acc again. Since the line api$setPortCallback(c("input1", "input2"), c(), "on_input12add") didn't specify output ports, the return value of on_input12add will be ignored. This callback's objective is just to update the acc global variable.
On the other hand, the returned value of on_input12mult is not ignored. Instead, the value on the returned list with key equal output1 will be fed into the output port of the same name. This intent was signaled by specifying "output1" in the list of outports in the setPortCallback method. Notice that when we specify that, we always need to return an R list with a key for each output name in the last line of the callback.
api$addTimer(period, outports, functionName):
This method registers a callback named functionName which will be repeatedly called on every period units of time. The period is passed as a string. For example, "1.3h3m2us" means (1.3 hours 3 minutes and 2 microseconds).
Available suffixes are: h, m, s, ms, us, ns. They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively. Only the strings "0", "+0", "-0" are allowed to NOT have a unit of time suffix (they are optional in these cases). Signs are optional and are allowed just in the beginning of the string.
The outports argument specify the name of zero or more outports to which the returned values from functionName should be sent. The values returned by the function are expected to be returned with the list data type in R. So the last line of the function should contain something like: list(output1=value1, output2=value2), where output1 and output2 are the outport names that were also specified in the outports argument of the method api$addTimer. If no outport was specified in api$addTimer then the last line of the callback function can be anything because its return value will be ignored anyway.
This example produces value 1,2,3... on every second on outport "output" until the graph shutdown.
api$setSwitchCallback(inports, outports, functionName):
This method adds a callback that is able to control the switch variable. When the switch is 'on' all other callbacks become enabled. When the switch is 'off' all other callbacks become disabled and no data will be consumed in their ports.
foo <- function(dataX, dataY) { out1 <- dataX + dataY out2 <- dataX * dataY if (dataX %% dataY == 0) { position <- FALSE } else { position <- TRUE } list(switchPosition=position, outputA=out1, outputB=out2) }
So the last line of switch callback always needs to have the keyword switchPosition in the returned list. Notice that the boolean returned will not be sent in any outport unless you include a port called 'switchPosition' of type byte in the operator and also specify this port in the outports parameter of the api$setSwitchCallback method. In case you specify, then the boolean will be converted to a byte and send in the 'switchPosition' port, in addition to being used internally to set the value of the switch.
There are two operator's configuration parameters related to the switch callback: switchStartOff and switchAffectTimerCallbacks. If switchStartOff is set to true, then the switch will start in the off position when the graph is started. The switchAffectTimerCallbacks configuration indicates whether the switch should affect (block/unblock) the timer callbacks too. On the other hand, the port callbacks are always affected, and the shutdown, and switch callbacks are never affected by the switch position. That is the shutdown, and switch callbacks are always enabled (unblocked).
Differently than api$setPortCallback, api$setTimer, and api.addShutdownHandler, api$setSwitchCallback can set only one callback. If you use the method more than once, the first callback will be replaced by the new one.
api$addShutdownHandler(functionName):
This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port before the graph stops.
Go/R Type Equivalences and Conversions
This section is useful if you are connecting R Client with a script Go operator.
Message Type
The Go message type can be represented in R either as a generic list object or as a data frame. A non-table message in Go will be represented as a list in R with the following fields: Body, Attributes, Encoding. The attributes should be a json like object. Encoding should be a string. A table message in Go has to follow a certain pattern. In the encoding field it needs to have the string "table". Also its body needs to be a []interface{} (slice of empty interfaces), where each element stores a column of the table. Each column should be a slice of numeric type, string, or boolean. All columns should have the same number of elements.
Go to R Conversions Examples:
-
"abcd" -> "abcd"
-
[]string{"al", "alkj"} -> c("al", "alkj")
-
[][]string{{"al", "alkj"}, {"al2", "alkj2"}} -> list(c("al", "alkj"), c("al2", "alkj2"))
-
int64(1) -> 1L
-
1.0 -> 1.0
-
[][]float64{{1, 2}, {3,4}} -> list(c(1, 2), c(3, 4))
-
[]byte{1, 2, 3} -> as.raw(c(1,2,3))
-
map[string]interface{}{"Body": 3, "Attributes": nil, "Encoding": "foo"} -> list(Body=3, Attributes=list(), Encoding="foo")
-
nil -> NA
-
The go message below:
map[string]interface{}{ "Body": []interface{}{ []string{"Mazda RX4", "Mazda RX4 Wag"}, []float64{21, 21}, []float64{6, 6}, }, "Attributes": map[string]interface{}{ "rlang": map[string]interface{}{"firstColumnIsRowNames": true}, "fieldNames": []string{"rowNames", "mpg", "cyl"}, }, "Encoding": "table", }
is converted to data.frame(row.names = c("Mazda RX4", "Mazda RX4 Wag"), mpg=c(21, 21), cyl=c(6, 6))
R to Go Conversions
This section is divided in two subsections. The first shows how conversions are done when the target Go type is known. The second covers the case when the target Go type is not known (when output port is of type any or when data is inside the body of a message).
Target Go Type is Known
In this context the output port of the R Client indicates the target Go type for the R object be converted to.
-
outport type uint64:
-
1L -> uint64(1)
-
2.7 -> uint64(2)
-
-
outport type int64:
-
-1L -> int64(-1)
-
-2.7 -> int64(-2)
-
-
outport type float64:
-
1L -> float64(1.0)
-
2.7 -> float64(2.7)
-
-
outport type []uint64:
-
1.3 -> []uint64{1}
-
c(1.3) -> []uint64{1}
-
c(1.3, 2.7) -> []uint64{1, 2}
-
Target Go Type is Unknown
The target Go type might not be known when the port type remains any even after the port types are propagated or when the data is inside the body of a message. In those cases all numeric and vector of numeric types in R will be converted to float64 and slice of float64 in Go, respectively. Singletons vectors in R (c(1)) will be converted to scalars in Go (1.0). On the other hand, singleton lists (list(1)) will be converted to a singleton slice in Go ([]float64{1.0}).
-
1L -> float64(1)
-
2.7 -> float64(2.7)
-
c(1.3) -> float64(1.3)
-
list(1) -> []float64{1}
-
c(1.3, 2.7) -> []float64{1.3, 2.7}
-
list(1.3, 3L) -> []float64{1.3, 3.0}
-
list(Body=3, Attributes=list()) -> map\[string\]interface{}{"Body": 3.0, "Attributes": nil}
-
list(c(1,2,3), c(4,5,6)) -> [][]float64{{1, 2, 3}, {4, 5, 6}}
-
list(c(1,2,3), c("alkj")) -> []interface{}{[]float64{1, 2, 3}, "alkj"}
-
c("one","two","three") -> []string{"one", "two", "three"}
-
list("one","two","three") -> []string{"one", "two", "three"}
-
NULL -> nil
-
NA -> nil
-
data.frame(row.names = c("Mazda RX4", "Mazda RX4 Wag"), mpg=c(21, 21), cyl=c(6, 6)) ->
map[string]interface{}{ "Body": []interface{}{ []string{"Mazda RX4", "Mazda RX4 Wag"}, []float64{21, 21}, []float64{6, 6}, }, "Attributes": map[string]interface{}{ "rlang": map[string]interface{}{"firstColumnIsRowNames": true}, "fieldNames": []string{"rowNames", "mpg", "cyl"}, }, "Encoding": "table", }
- data.frame(col1=c(TRUE, FALSE), col2=c(1,2))
->
map[string]interface{}{ "Body": []interface{}{ []bool{true, false}, []float64{1, 2}, }, "Attributes": map[string]interface{}{ "rlang": map[string]interface{}{"firstColumnIsRowNames": false}, "fieldNames": []string{"col1", "col2"}, }, "Encoding": "table", }
-
list(1, "lakj", c(2, 3, 4), c("alkj", "qoiu")) -> []interface{}{1.0, "lakj", []float64{2, 3, 4}, []string{"alkj", "qoiu"}}
-
list(list(1L, "abc", 2.0), list(), list("def", 3)) -> [][]interface{}{{1.0, "abc", 2.0}, {}, {"def", 3.0}}
-
list(c(1), c(1, 2), c(3, 4)) -> []interface{}{1.0, []float64{1, 2}, []float64{3, 4}}
Note that in the last example above that c(1) is converted to 1.0 and not to []float64{1.0}. This is because singleton vectors in R are converted to scalars in Go.
Docker Tags
The default R Client operator already comes with three tags: "rserve", "rjsonlite", and "rmsgpack". Those are needed to select the right docker image for the container where the operator's group will run. Those tags are only relevant when the user leaves the Host configuration field empty and thus causing a new RServe process to be launched inside the operator's group container. If you are going to connect to an already existing RServe by specifying the Host configuration then you don't need those tags. You can remove those tags by creating a new operator which extends the R Client operator.
Removing the tags is not mandatory when connecting to an external RServe, but it is recommended since it will lower the requirements for the docker image selected for the operator's group. If your external RServe doesn't already have the jsonlite and msgpack packages installed then they will be installed in the first time you run the R Client operator connected to that Host. The graph may take a while to start due to the installation time of those packages in your external RServe.