R Client
The RClient operator runs Microsoft R code defined by the user in the RServer. The user can specify the code they want to run by writing an R script and adding callbacks by using the predefined API object.
For example, you can set a callback function myFunc that is called when you receive new data in the port "input1", and then send data on port "output2" by writing api$setPortCallback(c("input1"), c("output2"), "myFunc").
-
api$setPortCallback(inports, outports, functionName)
-
api$addTimer(period, outports, functionName)
-
api$setSwitchCallback(inports, outports, functionName)
-
api$addShutdownHandler(functionName)
The user´s script will run during the initialization of the RClient operator. For code that should run after the initialization, the user can add and define callbacks in the script, by using methods from the API object such as: setPortCallback, addTimer, setSwitchCallback, and addShutdownHandler.
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
script | string | Mandatory. Inline script to be executed, or if the entered text starts with "file://" then it will be interpreted as the name of the script file to be executed. |
serverAddress | string | IP address of a running RServe. If left empty, then a Rserve
will be launched automatically and a port will be found
automatically.
Default: "" |
serverPort |
int |
Port of the RServe. If serverAddress was left empty, then this field will be ignored. Default: 0 |
username |
string |
Username for authenticating in Rserve if required. If serverAddress field is empty, then the supplied username will be ignored. Default: "" |
password |
string |
Password for authenticating in Rserve if required. If serverAddress field is empty then the supplied password will be ignored. Default: "" |
switchStartOff |
boolean |
In case, a switch callback exists, then the other callbacks will be disabled from the graph's start if this field is true. Default: true |
switchAffectTimerCallbacks |
boolean |
In case a switch callback exists, then the timer callback will be affected by the state of the switch if this field is true (the port callbacks are always affected). Default: true |
debugMode |
boolean |
Set to true to get better error messages, but the performance may be affected when enabled. Default: false |
Input
None
Output
None
Allowed Types and (Data Pipeline)/R Type Equivalences
Allowed Types In/Out Ports
-
string and slice of strings
-
blob ([]byte)
-
float64 and slice of float64
-
int64 and slice of int64
-
uint64 and slice of uint64
-
message
(Pipeline Engine)/R Type Equivalences
Pipeline Engine |
R |
---|---|
string |
character |
blob |
raw |
float64 |
double (64 bits) |
int64 |
integer (signed 32 bits) |
uint64 |
integer (signed 32 bits) |
slice or array |
vector |
message* |
list(Body,Attributes,Encoding) |
message** |
data frame |
* message with field encoding different than 'table'.
** message with encoding equal to 'table' and a slice of columns in the body.
RClient operator will not warn or error if a data loss happens (due to truncation) during the conversions: int64->int32, uint64->int32, or int32->uint64.
The Pipeline Engine message type can be represented in R either as a generic list object or as a data frame. A non-table message in Go will be represented as a list in R with the following fields: Body, Attributes, Encoding. The body field can receive a value of any type of the ports allowed in RClient except message. The attributes should be a json like object. Encoding should be a string. A table message in Go has to follow a certain pattern. In the encoding field it needs to have the string "table".
Also, its body needs to be a []interface{} (slice of empty interfaces), where each element stores a column of the table. Each element should be a slice of some of the types supported by RClient except for message. All columns should have the same number of elements. The attributes object should contain a field named "fieldNames" with the names of each column of the data frame. Optionally, it can also contain a field attributes["rlang"]["firstColumnIsRowNames"] indicating with a boolean whether the first column of the table should be considered as rownames in R. If the encoding field of the Go message is different than "table", then the equivalent type in R will be a generic list with the fields Body, Attributes, and Encoding.
API Object Methods
api$setPortCallback(inports, outports, functionName):
The script above has three callbacks: "on_input12add", "on_input12mult", and "on_input3exp". Both "on_input12add", "on_input12mult" functions wait for data to be available in the port group ("input1", "input2"). When the data becomes available, the data from "input1" is passed to the first argument of the callbacks and the data from "input2" is passed to the second argument of the callback. When more than one callback is associated with a port group (as in this case), the callbacks are executed sequentially in the order that the callbacks were added in the script.
The callback "on_input3pow" is associated with a port group that is disjointed from the previous one. In this case, the port group consists of just one input port "input3". Notice that the callback should be the name of a function variable which is in scope. Inport names should coincide with the name of an existing inport, and the number of ports in a port group need to equal the number of arguments of the callback associated with this group. It is important to notice that all port groups needs to be disjointed from each other. For example, you cannot add a callback associated with ports ("in1", "in2") and another associated with port ("in1") or ports ("in2", "in3"). However, it is allowed to add more than one callback to the same port group. The union of all port groups does not need to cover all existing input ports of the operator. That means that some input ports can be left unused.
The function on_input12add will add acc (a global variable) with data1 and data2, and assign the result to acc again. Since the line api$setPortCallback(c("input1", "input2"), c(), "on_input12add") didn't specify output ports, the return value of on_input12add will be ignored. This callback's objective is just to update the acc global variable.
On the other hand, the returned value of on_input12mult is not ignored. Instead, the value on the returned list with key equal output1 will be fed into the output port of the same name. This intent was signaled by specifying "output1" in the list of outports in the setPortCallback method. Notice that when we specify that, we always need to return an R list with a key for each output name in the last line of the callback.
api$addTimer(period, outports, functionName):
This method registers a callback named functionName which will be repeatedly called on every period units of time. The period is passed as a string. For example, "1.3h3m2us" means (1.3 hours 3 minutes and 2 microseconds).
Available suffixes are: h, m, s, ms, us, ns. They represent hours, minutes, seconds, milliseconds, microseconds, and nanoseconds, respectively. Only the strings "0", "+0", "-0" are allowed to NOT have a unit of time suffix (they are optional in these cases). Signs are optional and are allowed just in the beginning of the string.
The outports argument specify the name of zero or more outports to which the returned values from functionName should be sent. The values returned by the function are expected to be returned with the list data type in R. So the last line of the function should contain something like: list(output1=value1, output2=value2), where output1 and output2 are the outport names that were also specified in the outports argument of the method api$addTimer. If no outport was specified in api$addTimer then the last line of the callback function can be anything because its return value will be ignored anyway.
This example produces value 1,2,3... on every second on outport "output" until the graph shutdown.
api$setSwitchCallback(inports, outports, functionName):
This method adds a callback that is able to control the switch variable. When the switch is 'on' all other callbacks become enabled. When the switch is 'off' all other callbacks become disabled and no data will be consumed in their ports.
foo <- function(dataX, dataY) { out1 <- dataX + dataY out2 <- dataX * dataY if (dataX %% dataY == 0) { position <- FALSE } else { position <- TRUE } list(switchPosition=position, outputA=out1, outputB=out2) }
So the last line of switch callback always needs to have the keyword switchPosition in the returned list. Notice that the boolean returned will not be sent in any outport unless you include a port called 'switchPosition' of type byte in the operator and also specify this port in the outports parameter of the api$setSwitchCallback method. In case you specify, then the boolean will be converted to a byte and send in the 'switchPosition' port, in addition to being used internally to set the value of the switch.
There are two operator's configuration parameters related to the switch callback: switchStartOff and switchAffectTimerCallbacks. If switchStartOff is set to true, then the switch will start in the off position when the graph is started. The switchAffectTimerCallbacks configuration indicates whether the switch should affect (block/unblock) the timer callbacks too. On the other hand, the port callbacks are always affected, and the shutdown, and switch callbacks are never affected by the switch position. That is the shutdown, and switch callbacks are always enabled (unblocked).
Differently than api$setPortCallback, api$setTimer, and api.addShutdownHandler, api$setSwitchCallback can set only one callback. If you use the method more than once, the first callback will be replaced by the new one.
api$addShutdownHandler(functionName):
This example print "shutdown1: 7" and "shutdown2: 7" if 7 values have been provided on "input" port before the graph stops.
Current Limitations
-
Data frames/tables with columns of type raw/blob are not yet supported.
-
Only one level of slices are supported ([]string, []int64, ...). Higher order slices are not supported such as: [][]string, [][]int64, [][][]float64, etc
-
Adding new callbacks during other callback executions will have no effect. That is, it is not possible to add new callbacks after the operator's initialization.
-
Currently the error messages are very unhelpful. This is due to the fact that the Rserve returns very vague error codes. To debug your R script, you can launch the Rserve from inside the R command line. Doing so will provide more detailed error messages in the R command line when an error happens in the Rserve.