Modeling Guide for SAP Data Hub

Vora Avro Ingestor

The Vora Avro Ingestor operator allows you to dynamically ingest data into SAP Vora based on the incoming Avro or other text messages in CSV and JSON.

An Avro schema is used to describe the structure of the record and the Vora table schema is derived from this schema. More precisely, the target table will be automatically created with the Avro record name and the table columns derived from the Avro record fields. If the table already exists, its columns must match those Avro record fields.

The Avro schema can be provided in several ways. If the message is an Avro message, the schema may be included in the message. Otherwise, the schema can be configured either in this operator's configuration or included in the message itself under attribute "avro.schema". If the input data needs to be processe before being stored, use SAP Vora Avro Decoder and SAP Vora Ingestor instead..

The following table describes the supported Avro primitive and logical types and how they are translated into Vora SQL types.

Avro

go

js

CSV (sample values)

Vora

boolean

bool

bool

true

BOOLEAN
int int32

Number

9 INTEGER
long int64

Number

99 BIGINT
float float32

Number

99.9 FLOAT

double

float64

Number

99.99 DOUBLE
bytes string

string

aGVsbG8= VARCHAR(*)

string

string

string

hello

VARCHAR(*)

decimal(p,s)

string

string

1987.74

DECIMAL(p,s)

date

string

string

2017-08-29

DATE

time-millis

string

string

15:28:50.345

TIME

time-micros

string

string

15:28:50.345678

TIME

timestamp-millis

time.Time

string

2017-08-29 15:28:50.345

TIMESTAMP

timestamp-micros

time.Time

string

2017-08-29 15:28:50.345678

TIMESTAMP

fixed(n)

string

string

68656c6c6f

VARCHAR(2n)

Note aGVsbG8= and 68656c6c6f are the base64 and hexadecimal representations of value hello, respectively.

Note date, time-millis, time-micros, timestamp-millis, and timestamp-micros values may be represented as an integer based representation based on the Avro's type definition instead of the above string based representation.

The above Avro to Vora type association may be customized using the extension properties. The following table describes the supported extension properties.

Extension Property

Supported in Avro Types

Description

colName

all types

string

Specify the column name.

size

int

Number

Specify the bit size 8, 16, 32, 64 to use TINYINT, SMALLINT, INTEGER, BIGINT, respectively.

maxLength

string, bytes

Number

Specify the maximal length.

primaryKey

all types

bool

Specify whether the field is primary key.

Note that undeclared fields are ignored during arguments building for ingestion.

A record may be nested arbitrarily but must be bounded so that it can be flattened to a fixed table column definition. In the derived table column definition, the column names correspond to the fully qualified field names of the record (i.e., each field name in the nested structure is concatenated. For example, a field named bar under its parent field named foo is named as foo_bar).

When using CSV records, records are represented by rows of CSV lines. The first line may represent the field or column names, commonly known as the header line. In this case, the ordering of the fields do not necessarily match the ordering of the fields defined in the Avro schema. Furthermore, some fields may be omitted if there is no value corresponding to those fields. If no header line is present, the ordering of the values must match the ordering of the fields defined in the avro schema.

When using JSON records, records are represented as a JSON array of maps or arrays. The former assumes a map based representation of a record, where each key value pair in the map is assigned to its corresponding field. The latter assumes the structure similar to CSV, where an array of values is used to represent a record. In this case, the first array may represent the header line. The JSON records may be given as a structured object (i.e., as an array of maps or arrays) or in its serialized form.

The following sample graphs use this operator.

Sample Graph

Description

com.sap.demo.vora.ingestion.ingestion.avro_ingestion_direct_example Reading Avro messages from Kafka and directly writing to Vora Disk Engine.Generating CSV messages with datagenerator and directly writing to Vora Disk Engine.

com.sap.demo.vora.ingestion.csv_ingestion_direct_example2

Generating CSV messages with datagenerator and directly writing to Vora Disk Engine.

Configuration Parameters

Parameter

Type

Description

connectionType

string

Generating CSV messages with datagenerator and directlyThe connection to SAP Vora can be configured directly using dsn or indirectly using connection.

Default: "dsn"

dsn

string

A valid data source name in the format v2://host:port/?binary=true. Make sure that you add /?binary=true to the end, because only binary transfer is available for the SAP Vora Transaction Coordinator.

Default: "v2://localhost:2204/?binary=true"

user

string

The user name if the connection is configured using dsn.

Default: ""

password

string

The password if the connection is configured using dsn.

Default: ""

connection

object

A valid connection configuration provided by ConnectionManager.

aggregation

bool

Enables the automatic aggregation of records to trigger a series of bulk inserts independently of the number of records contained in each Avro message.

Default: false

aggregateMaxBytes

int

Limits the maximal size of the aggregated records in bytes under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated.

Default: 4194304

aggregateMaxRecs

int

Limits the maximal number of the aggregated records under the auto-aggregation mode. Until the aggregated records reach this limit, the records are aggregated.

Default: 1000

aggregateMaxTime

int

Limits the maximal time in milliseconds to wait until flushing the aggregated records that have not reached the size constraints.

Default: 2000

databaseSchema

string

The database schema name.

Default: "TPCH"

engineType

string

The engine type ("DISK" or "SERIES").

Default: "DISK"

partitionKeyRegex

string

A regular expression to select a sequence of Avro record field names to be bound to the arguments to the specified particion function. Details: When generating the partitioning scheme (the actual binding of arguments to parameters of partition keys) the system deduces the matching of actual columns to parameters of the partition scheme using this regular expression.

Example: Suppose we have a table "T(col1 VARCHAR(500), col2 BIGINT, col3 DATE)", and a hash partition function "pf(pa, pb)". Then, specifying partitionKeyRegex , .*1|.*3 will select col1 and col3 to be bound to the parameters pa and pb, respectively.

Default: ""

partitionCriterion

string

A partition function.

Default: false

tableType

string

The table type ("STREAMING").

Default: "STREAMING"

ingestionMode

string

The ingestion mode INSERT or UPSERT

Default: "INSERT"

primaryKeyRegex

string

A regular expression to select some Avro record field names to be used as the primary keys. The primary keys may be specified in the Avro schema using extension property "primaryKey". This parameter is only useful when the primary keys are not specified in the Avro schema.

Default: ""

defaultAvroSchema

string

The default Avro schema to be used when the incoming message does not include its schema. This parameter is mandatory for Avro messages without schema or CSV messages.

Default: ""

varLimit

integer

The default size limit for the varchar columns, where 0 indicates unlimited, i.e. '*'. The size limit may be specified in the Avro schema using extension property "maxLength" per record field.

Default: 0

format

string

The input message format. The accepted values are "avro" or "csv".

Default: "avro"

csvComma

rune

The delimiter character code for the CSV format. For example, 44 for ','; 59 for ';'; 124 for '|'.

Default: ','

csvHeaderIncluded

bool

The input CSV message contains the header line.

Default: false

Input

Input

Type

Description

in

message

Accepts messages containing Avro, CSV, or JSON messages in the body.

Output

Output

Type

Description

out

message

Messages with the header properties that indicate the commit progress. If the input message does not contain a commit token (i.e., message header message.commit.token), no output will be generated.