SAP Vora Avro Ingestor
The SAP Vora Avro Ingestor operator allows you to dynamically ingest data into SAP Vora based on the incoming Avro or other text messages in CSV and JSON.
When using CSV messages, each line is interpreted using the provided Avro schema. When there is no header specified in CSV, the value is directly assigned to the corresponding field (i.e., the n-th field of CSV is assigned to the n-th field of Avro record). When a header is specified, the value is assigned to the field with the corresponding field name.
Avro |
go |
js |
CSV (sample values) |
Vora |
---|---|---|---|---|
boolean |
bool |
bool |
true |
BOOLEAN |
int | int32 |
Number |
9 | INTEGER |
long | int64 |
Number |
99 | BIGINT |
float | float32 |
Number |
99.9 | FLOAT |
double |
float64 |
Number |
99.99 | DOUBLE |
bytes | string |
string |
aGVsbG8= | VARCHAR(*) |
string |
string |
string |
hello |
VARCHAR(*) |
decimal(p,s) |
string |
string |
1987.74 |
DECIMAL(p,s) |
date |
string |
string |
2017-08-29 |
DATE |
time-millis |
string |
string |
15:28:50.345 |
TIME |
time-micros |
string |
string |
15:28:50.345678 |
TIME |
timestamp-millis |
time.Time |
string |
2017-08-29 15:28:50.345 |
TIMESTAMP |
timestamp-micros |
time.Time |
string |
2017-08-29 15:28:50.345678 |
TIMESTAMP |
fixed(n) |
string |
string |
68656c6c6f |
VARCHAR(2n) |
Note aGVsbG8= and 68656c6c6f are the base64 and hexadecimal representations of value hello, respectively.
Note date, time-millis, time-micros, timestamp-millis, and timestamp-micros values may be represented as an integer based representation based on the avro's type definition instead of the above string based representation.
The above Avro to Vora type association may be customized using the extension properties. The following table describes the supported extension properties.
Extension Property |
Supported in Avro Types |
Property Type |
Description |
---|---|---|---|
colName |
all types |
string |
Specify the column name. |
size |
int |
Number |
Specify the bit size 8, 16, 32, 64 to use TINYINT, SMALLINT, INTEGER, BIGINT, respectively. |
maxLength |
string, bytes |
Number |
Specify the maximal length. |
primaryKey |
all types |
bool |
Specify whether the field is primary key. |
Note that undeclared fields are ignored during arguments building for ingestion.
A record may be nested arbitrarily but must be bounded so that it can be flattened to a fixed table column definition. In the derived table column definition, the column names correspond to the fully qualified field names of the record (i.e., each field name in the nested structure is concatenated. For example, a field named bar under its parent field named foo is named as foo_bar).
When using CSV records, records are represented by rows of CSV lines. The first line may represent the field or column names, commonly known as the header line. In this case, the ordering of the fields do not necessarily match the ordering of the fields defined in the avro schema. Furthermore, some fields may be omitted if there is no value corresponding to those fields. If no header line is present, the ordering of the values must match the ordering of the fields defined in the avro schema.
When using JSON records, records are represented as a JSON array of maps or arrays. The former assumes a map based representation of a record, where each key value pair in the map is assigned to its corresponding field. The latter assumes the structure similar to CSV, where an array of values is used to represent a record. In this case, the first array may represent the header line. The JSON records may be given as a structured object (i.e., as an array of maps or arrays) or in its serialized form.
The following sample graphs use this operator.
Sample Graph |
Description |
---|---|
com.sap.demo.vora.ingestion.ingestion.avro_ingestion_direct_example | Reading avro messages from kafka and directly writing to vora disk engine. |
com.sap.demo.vora.ingestion.csv_ingestion_direct_example2 |
Generating csv messages with datagenerator and directly writing to vora disk engine. |
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
connectionType |
string |
The connection to SAP Vora can be configured directly using dsn or indirectly using connection. Default: "dsn" |
dsn |
string |
A valid data source name in the format v2://host:port/?binary=true. Make sure that you add /?binary=true to the end, because only binary transfer is available for the SAP Vora Transaction Coordinator. Default: "v2://localhost:2204/?binary=true" |
user |
string |
The user name if the connection is configured using dsn. Default: "" |
password |
string |
The password if the connection is configured using dsn. Default: "" |
connection |
object |
A valid connection configuration provided by ConnectionManager. |
aggregation |
bool |
Enables the automatic aggregation of records to trigger a series of bulk inserts independently of the number of records contained in each Avro message. Default: false |
aggregateMaxBytes |
int |
Limits the maximal size of the aggregated records in bytes under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated. Default: 4194304 |
aggregateMaxRecs |
int |
Limits the maximal number of the aggregated records under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated. Default: 1000 |
aggregateMaxTime |
int |
Limits the maximal time in milliseconds to wait until flushing the aggregated records that have not reached the size constraints. Default: 2000 |
databaseSchema |
string |
The database schema name. Default: "TPCH" |
engineType |
string |
The engine type ("DISK" or "SERIES"). Default: "DISK" |
partitionKeyRegex |
string |
A regular expression to match some Avro field names to identify the partition key. Default: "" |
partitionCriterion |
string |
A partition function. Default: false |
tableType |
string |
The table type ("STREAMING"). Default: "STREAMING" |
ingestionMode |
string |
The ingestion mode INSERT or UPSERT Default: "INSERT" |
primaryKeyRegex |
string |
A regular expression to match some record field names to identify the primary keys. The primary keys may be specified in the avro schema and this parameter is only useful when the primary keys are not specified in the avro schema. Default: "" |
defaultAvroSchema |
string |
The default Avro schema to be used when the incoming message does not include its schema. This parameter is mandatory for Avro messages without schema or CSV messages. Default: "" |
varLimit |
integer |
The default size limit for the varchar columns, where 0 indicates unlimited, i.e. '*'. Default: 0 |
format |
string |
The input message format. The accepted values are "avro" or "csv". Default: "avro" |
csvComma |
rune |
The delimiter character code for the CSV format. For example, 44 for ','; 59 for ';'; 124 for '|'. Default: ',' |
csvHeaderIncluded |
bool |
The input CSV message contains the header line. Default: false |
Input
Input |
Type |
Description |
---|---|---|
in |
message |
Accepts messages containing Avro, CSV, or JSON messages in the body. |
Output
Output |
Type |
Description |
---|---|---|
out |
message |
Messages with the header properties that indicate the commit progress. If the input message does not contain a commit token (i.e., message header message.commit.token), no output will be generated. |