Ingest Oracle Table
Oracle Consumer is based on datahub-flowagent and uses Data Services as an execution engine.
Data Services provides two methods of reading data from Oracle sources using the third party OCI client. You can either use Oracle Table Consumer to read the table, or use Oracle SQL Consumer to read the result set of the native sql.
Oracle Table Consumer
Oracle Table Consumer takes in schema.tablename as input and it will read the table from the source piece-wise (100 rows per call) and load to next operator in line.
Prerequisites
Configuration using Existing Client
-
You can configure to have the client available on the k8s cluster using the vsystem app.
-
As part of flowagent startup, it looks for /vrep/flowagent/tp-clients-env.sh
-
If the file does not exist, please create the file with the following contents:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/path/to/instantclient/ export NLS_LANG=AMERICAN_AMERICA.UTF8 #For UTF-8 support
-
Otherwise modify the file to contain the path to your client.
Configuration using Oracle Instant Client
-
Download OCI client from here.
-
You can configure to have the client available on the k8s cluster using the vsystem app. Let's assume you upload and extract it to /vrep/flowagent/dbclients
-
Now you can configure the OCI client using the commands below:
cd $ORACLE_INSTANT_CLIENT/instantclient_12_2 ln -s libclntsh.so.12.1 libclntsh.so.12 ln -s libclntsh.so.12 libclntsh.so ln -s libclntshcore.so.12.1 libclntshcore.so.12 ln -s libclntshcore.so.12 libclntshcore.so ln -s libocci.so.12.1 libocci.so.12 ln -s libocci.so.12 libocci.so
-
As part of flowagent startup it looks for /vrep/flowagent/tp-clients-env.sh
-
If the file does not exists, please create the file with the following contents:
export ORACLE_INSTANT_CLIENT=/vrep/flowagent/dbclients export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$ORACLE_INSTANT_CLIENT/instantclient_12_2 export NLS_LANG=AMERICAN_AMERICA.UTF8 #For UTF-8 support
Components
- Triggering the graph
constantgenerator1: Provide constant input to Consumer which triggers the ingestion.
- Defining Oracle connection
oracletableconsumer1: Provides Oracle Consumer connection information.
- Loader definition
flowagentfileproducer1: Data Services requires to provide a loader either csv loader to load to Vflow operators.
Components
-
triggering the graph
constantgenerator1: provide constant input to Consumer which triggers the ingestion
- Defining oracle connection
oracletableconsumer1: provides oracle consumer connection information
- Loader definition
flowagentfileproducer1: Dataservices requires to provide a loader either csv loader to load to Vflow operators
Partition
Partitioning the source data allows us to load data in chunks there by overcome memory pressure and by doing it in parallel we can load data faster and improve overall performance. Below are supported partitioning methods with this operator.
Logical Partition
- defaultPartition:
The partition which will fetch all rows which were not filtered by the other partitions (you can disable it by setting defaultPartition to false)
- conditions:
Each condition represents a set of filters which will be performed on a certain column:
-
columnExpression: The column where the filters will be applied. It can be either the column name or a function on it (i.e., TRUNC(EMPLOYEE_NAME))
-
type: The filter type. It can be either “LIST” (to filter exact values) or “RANGE” (to filter a range of values);
-
dataType: The data type of the elements. It can be either “STRING”, “NUMBER” or “EXPRESSION”;
-
elements: Each element represents a different filter, and its semantics depends on the “type” (see above).
-
- When more than one condition is presented, it will be performed a
Cartesian product among the elements of each condition:
-
Condition A: [C1, C2]
-
Condition B: [C3, C4]
-
Resulting filters: [(C1, C3), (C1, C4), (C2, C3), (C2, C4)]
-
Let’s assume we have a source table called EMPLOYEE which has the following schema:
CREATE TABLE EMPLOYEE( EMPLOYEE_ID NUMBER(5) PRIMARY KEY, EMPLOYEE_NAME VARCHAR(32), EMPLOYEE_ADMISSION_DATE DATE );
Range
We can create a JSON representing a RANGE partition as below, where the elements represent ranges, thus, they need to be sorted in ascending order.
Numeric partitions:
{ "defaultPartition": true, "conditions": [ { "columnExpression": "EMPLOYEE_ID", "type": "RANGE", "dataType": "NUMBER", "elements": [ "10", "20", "30" ] } ] }
{ "defaultPartition": true, "conditions": [ { "columnExpression": "EMPLOYEE_NAME", "type": "RANGE", "dataType": "STRING", "elements": [ "M", "T" ] } ] }
{ "defaultPartition": true, "conditions": [ { "columnExpression": "EMPLOYEE_ADMISSION_DATE", "type": "RANGE", "dataType": "EXPRESSION", "elements": [ "TO_DATE('2012-01-01', 'YYYY-MM-DD')", "TO_DATE('2015-01-01', 'YYYY-MM-DD')" ] } ] }
List
List partitions can be used to filter exact values. Each element represents a different filter.
Numeric partitions:
{ "defaultPartition": true, "conditions": [ { "columnExpression": "EMPLOYEE_ID", "type": "LIST", "dataType": "NUMBER", "elements": [ "10", "20", "50" ] } ] }
{ "defaultPartition": false, "conditions": [ { "columnExpression": "EMPLOYEE_NAME", "type": "LIST", "dataType": "STRING", "elements": [ "Jhon", "Ana", "Beatrice" ] } ] }
{ "defaultPartition": false, "conditions": [ { "columnExpression": "TRUNC(EMPLOYEE_ADMISSION_DATE)", "type": "LIST", "dataType": "EXPRESSION", "elements": [ "TO_DATE('2012-07-17', 'YYYY-MM-DD')" ] } ] }
Combined
{ "defaultPartition": true, "conditions": [ { "columnExpression": "EMPLOYEE_NAME", "type": "LIST", "dataType": "NUMBER", "elements": [ "Beatrice", "Ana" ] }, { "columnExpression": "EMPLOYEE_ADMISSION_DATE", "type": "RANGE", "dataType": "EXPRESSION", "elements": [ "TO_DATE('2015-01-01', 'YYYY-MM-DD')", "TO_DATE('2016-01-01', 'YYYY-MM-DD')" ] } ] }