Vora Avro Ingestor
The Vora Avro Ingestor operator allows you to dynamically ingest data into SAP Vora based on the incoming Avro or other text messages in CSV and JSON.
The Avro schema can be provided in several ways. If the message is an Avro message, the schema may be included in the message. Otherwise, the schema can be configured either in this operator's configuration or included in the message itself under attribute "avro.schema". If the input data needs to be processe before being stored, use SAP Vora Avro Decoder and SAP Vora Ingestor instead..
Avro |
go |
js |
CSV (sample values) |
Vora |
---|---|---|---|---|
boolean |
bool |
bool |
true |
BOOLEAN |
int | int32 |
Number |
9 | INTEGER |
long | int64 |
Number |
99 | BIGINT |
float | float32 |
Number |
99.9 | FLOAT |
double |
float64 |
Number |
99.99 | DOUBLE |
bytes | string |
string |
aGVsbG8= | VARCHAR(*) |
string |
string |
string |
hello |
VARCHAR(*) |
decimal(p,s) |
string |
string |
1987.74 |
DECIMAL(p,s) |
date |
string |
string |
2017-08-29 |
DATE |
time-millis |
string |
string |
15:28:50.345 |
TIME |
time-micros |
string |
string |
15:28:50.345678 |
TIME |
timestamp-millis |
time.Time |
string |
2017-08-29 15:28:50.345 |
TIMESTAMP |
timestamp-micros |
time.Time |
string |
2017-08-29 15:28:50.345678 |
TIMESTAMP |
fixed(n) |
string |
string |
68656c6c6f |
VARCHAR(2n) |
Note aGVsbG8= and 68656c6c6f are the base64 and hexadecimal representations of value hello, respectively.
Note date, time-millis, time-micros, timestamp-millis, and timestamp-micros values may be represented as an integer based representation based on the Avro's type definition instead of the above string based representation.
The above Avro to Vora type association may be customized using the extension properties. The following table describes the supported extension properties.
Extension Property |
Supported in Avro Types |
Description |
|
---|---|---|---|
colName |
all types |
string |
Specify the column name. |
size |
int |
Number |
Specify the bit size 8, 16, 32, 64 to use TINYINT, SMALLINT, INTEGER, BIGINT, respectively. |
maxLength |
string, bytes |
Number |
Specify the maximal length. |
primaryKey |
all types |
bool |
Specify whether the field is primary key. |
Note that undeclared fields are ignored during arguments building for ingestion.
A record may be nested arbitrarily but must be bounded so that it can be flattened to a fixed table column definition. In the derived table column definition, the column names correspond to the fully qualified field names of the record (i.e., each field name in the nested structure is concatenated. For example, a field named bar under its parent field named foo is named as foo_bar).
When using CSV records, records are represented by rows of CSV lines. The first line may represent the field or column names, commonly known as the header line. In this case, the ordering of the fields do not necessarily match the ordering of the fields defined in the Avro schema. Furthermore, some fields may be omitted if there is no value corresponding to those fields. If no header line is present, the ordering of the values must match the ordering of the fields defined in the avro schema.
When using JSON records, records are represented as a JSON array of maps or arrays. The former assumes a map based representation of a record, where each key value pair in the map is assigned to its corresponding field. The latter assumes the structure similar to CSV, where an array of values is used to represent a record. In this case, the first array may represent the header line. The JSON records may be given as a structured object (i.e., as an array of maps or arrays) or in its serialized form.
The following sample graphs use this operator.
Sample Graph |
Description |
---|---|
com.sap.demo.vora.ingestion.ingestion.avro_ingestion_direct_example | Reading Avro messages from Kafka and directly writing to Vora Disk Engine.Generating CSV messages with datagenerator and directly writing to Vora Disk Engine. |
com.sap.demo.vora.ingestion.csv_ingestion_direct_example2 |
Generating CSV messages with datagenerator and directly writing to Vora Disk Engine. |
Configuration Parameters
Parameter |
Type |
Description |
---|---|---|
connectionType |
string |
Generating CSV messages with datagenerator and directlyThe connection to SAP Vora can be configured directly using dsn or indirectly using connection. Default: "dsn" |
dsn |
string |
A valid data source name in the format v2://host:port/?binary=true. Make sure that you add /?binary=true to the end, because only binary transfer is available for the SAP Vora Transaction Coordinator. Default: "v2://localhost:2204/?binary=true" |
user |
string |
The user name if the connection is configured using dsn. Default: "" |
password |
string |
The password if the connection is configured using dsn. Default: "" |
connection |
object |
A valid connection configuration provided by ConnectionManager. |
aggregation |
bool |
Enables the automatic aggregation of records to trigger a series of bulk inserts independently of the number of records contained in each Avro message. Default: false |
aggregateMaxBytes |
int |
Limits the maximal size of the aggregated records in bytes under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated. Default: 4194304 |
aggregateMaxRecs |
int |
Limits the maximal number of the aggregated records under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated. Default: 1000 |
aggregateMaxTime |
int |
Limits the maximal time in milliseconds to wait until flushing the aggregated records that have not reached the size constraints. Default: 2000 |
databaseSchema |
string |
The database schema name. Default: "TPCH" |
engineType |
string |
The engine type ("DISK" or "SERIES"). Default: "DISK" |
partitionKeyRegex |
string |
A regular expression to select a sequence of Avro record field names to be bound to
the arguments to the specified particion function. Details: When
generating the partitioning scheme (the actual binding of arguments
to parameters of partition keys) the system deduces the matching of
actual columns to parameters of the partition scheme using this
regular expression.
Example: Suppose we have a table "T(col1 VARCHAR(500), col2 BIGINT, col3 DATE)", and a hash partition function "pf(pa, pb)". Then, specifying partitionKeyRegex , .*1|.*3 will select col1 and col3 to be bound to the parameters pa and pb, respectively. Default: "" |
partitionCriterion |
string |
A partition function. Default: false |
tableType |
string |
The table type ("STREAMING"). Default: "STREAMING" |
ingestionMode |
string |
The ingestion mode INSERT or UPSERT Default: "INSERT" |
primaryKeyRegex |
string |
A regular expression to select some Avro record field names to be used as the primary keys. The primary keys may be specified in the Avro schema using extension property "primaryKey". This parameter is only useful when the primary keys are not specified in the Avro schema. Default: "" |
defaultAvroSchema |
string |
The default Avro schema to be used when the incoming message does not include its schema. This parameter is mandatory for Avro messages without schema or CSV messages. Default: "" |
varLimit |
integer |
The default size limit for the varchar columns, where 0 indicates unlimited, i.e. '*'. The size limit may be specified in the Avro schema using extension property "maxLength" per record field. Default: 0 |
format |
string |
The input message format. The accepted values are "avro" or "csv". Default: "avro" |
csvComma |
rune |
The delimiter character code for the CSV format. For example, 44 for ','; 59 for ';'; 124 for '|'. Default: ',' |
csvHeaderIncluded |
bool |
The input CSV message contains the header line. Default: false |
Input
Input |
Type |
Description |
---|---|---|
in |
message |
Accepts messages containing Avro, CSV, or JSON messages in the body. |
Output
Output |
Type |
Description |
---|---|---|
out |
message |
Messages with the header properties that indicate the commit progress. If the input message does not contain a commit token (i.e., message header message.commit.token), no output will be generated. |