Modeling Guide for SAP Data Hub

Ingest Oracle SQL

Oracle Consumer is based on datahub-flowagent and uses Dataservices as execution engine.

Dataservices provides two methods of reading data from Oracle sources using the third party OCI client. You can either use Oracle Table Consumer to read the table, or use Oracle SQL Consumer to read the result set of the native sql.

Oracle SQL Consumer

SQL Consumer takes in native SQL statement that you would run on Oracle as input and it will read the response result set from the source piece-wise (100 rows per call) and load to the next operator in line.

Prerequisites

  • See Oracle Client configuration on the tabelconsumer graph (com.sap.dh.ingestion.oracle.tableconsumer).

Components

  • triggering the graph

    constantgenerator1: provide constant input to Consumer which triggers the ingestion

  • Defining oracle connection

    oracletableconsumer1: provides oracle consumer connection information

  • Loader definition

    flowagentfileproducer1: Dataservices requires to provide a loader either csv loader to load to Vflow operators

Partition

Partitioning the source data allows us to load data in chunks there by overcome memory pressure and by doing it in parallel we can load data faster and improve overall performance. Below are supported partitioning methods with this operator.

Logical Partition

Logical Partitions are user defined partitions on how to read data from the source. For Table consumer, you can choose the partition type and then add the partition specification as below. For SQL consumer, we allow a user to type in any SQL that the underlying source supports. To allow partition logic to be used, a user would need to write the SQL with the $partition_specification. We will replace this object with partition information based on the provided specification.
  • defaultPartition:

    The partition which will fetch all rows which were not filtered by the other partitions (you can disable it by setting defaultPartition to false)

  • conditions:
    Each condition represents a set of filters which will be performed on a certain column:
    • columnExpression: The column where the filters will be applied. It can be either the column name or a function on it (i.e., TRUNC(EMPLOYEE_NAME))

    • type: The filter type. It can be either “LIST” (to filter exact values) or “RANGE” (to filter a range of values);

    • dataType: The data type of the elements. It can be either “STRING”, “NUMBER” or “EXPRESSION”;

    • elements: Each element represents a different filter, and its semantics depends on the “type” (see above).

  • When more than one condition is presented, it will be performed a Cartesian product among the elements of each condition:
    • Condition A: [C1, C2]

    • Condition B: [C3, C4]

    • Resulting filters: [(C1, C3), (C1, C4), (C2, C3), (C2, C4)]

Example:

Let’s assume we have a source table called EMPLOYEE which has the following schema:

CREATE TABLE EMPLOYEE(
    EMPLOYEE_ID NUMBER(5) PRIMARY KEY,
    EMPLOYEE_NAME VARCHAR(32),
    EMPLOYEE_ADMISSION_DATE DATE
);
And the following SQL statement:
SELECT * FROM EMPLOYEEE E $partition_specification

Range

We can create a JSON representing a RANGE partition as below, where the elements represent ranges, thus, they need to be sorted in ascending order.

Numeric partitions:

{
  "defaultPartition": true,
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_ID",
      "type": "RANGE",
      "dataType": "NUMBER",
      "elements": [
        "10",
        "20",
        "30"
      ]
    }
  ]
}
This will generate the following SQL statements:
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID < 10
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID >= 10 AND EMPLOYEE_ID < 20 )
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID >= 20 AND EMPLOYEE_ID < 30 )
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID >= 30
String partitions:
{
  "defaultPartition": true,
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_NAME",
      "type": "RANGE",
      "dataType": "STRING",
      "elements": [
        "M",
        "T"
      ]
    }
  ]
}

This will generate the following SQL statements:

SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME < 'M'
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME >= 'M' AND EMPLOYEE_NAME < 'T'
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME >= 'T'
Date partitions:
{
  "defaultPartition": true, 
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_ADMISSION_DATE",
      "type": "RANGE",
      "dataType": "EXPRESSION",
      "elements": [
        "TO_DATE('2012-01-01', 'YYYY-MM-DD')",
        "TO_DATE('2015-01-01', 'YYYY-MM-DD')"
      ]
    }
  ]
}

This will generate the following SQL statements:

SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ADMISSION_DATE < TO_DATE('2012-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ADMISSION_DATE >= TO_DATE('2012-01-01', 'YYYY-MM-DD' AND EMPLOYEE_ADMISSION_DATE < TO_DATE('2015-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ADMISSION_DATE >= TO_DATE('2015-01-01', 'YYYY-MM-DD')

List

List partitions can be used to filter exact values. Each element represents a different filter.

Numeric partitions:

{
  "defaultPartition": true, 
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_ID",
      "type": "LIST",
      "dataType": "NUMBER",
      "elements": [
        "10",
        "20",
        "50"
      ]
    }
  ]
}
This will generate the following SQL statements:
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID = 10
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID = 20
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID = 50
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ID != 10 AND EMPLOYEE_ID != 20 AND EMPLOYEE_ID != 50
String partitions:
{
  "defaultPartition": false,
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_NAME",
      "type": "LIST",
      "dataType": "STRING",
      "elements": [
        "Jhon",
        "Ana",
        "Beatrice"
      ]
    }
  ]
}

This will generate the following SQL statements:

SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Jhon'
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Ana'
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Beatrice'
Date partitions:
{
  "defaultPartition": false,
  "conditions": [
    {
      "columnExpression": "TRUNC(EMPLOYEE_ADMISSION_DATE)",
      "type": "LIST",
      "dataType": "EXPRESSION",
      "elements": [
        "TO_DATE('2012-07-17', 'YYYY-MM-DD')"
      ]
    }
  ]
}

This will generate the following SQL statements:

SELECT * FROM EMPLOYEE WHERE TRUNC(EMPLOYEE_ADMISSION_DATE) = TO_DATE('2012-07-17', 'YYYY-MM-DD')

Combined

{
  "defaultPartition": true, 
  "conditions": [
    {
      "columnExpression": "EMPLOYEE_NAME",
      "type": "LIST",
      "dataType": "NUMBER",
      "elements": [
        "Beatrice",
        "Ana"
      ]
    },
    {
      "columnExpression": "EMPLOYEE_ADMISSION_DATE",
      "type": "RANGE",
      "dataType": "EXPRESSION",
      "elements": [
        "TO_DATE('2015-01-01', 'YYYY-MM-DD')",
        "TO_DATE('2016-01-01', 'YYYY-MM-DD')"
      ]
    }
  ]
}

This will generate the following SQL statements:

SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Beatrice' AND EMPLOYEE_ADMISSION_DATE < TO_DATE('2015-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Beatrice' AND EMPLOYEE_ADMISSION_DATE >= TO_DATE('2015-01-01', 'YYYY-MM-DD') AND EMPLOYEE_ADMISSION_DATE < TO_DATE('2016-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Ana' AND EMPLOYEE_ADMISSION_DATE < TO_DATE('2015-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_NAME = 'Ana' AND EMPLOYEE_ADMISSION_DATE >= TO_DATE('2015-01-01', 'YYYY-MM-DD') AND EMPLOYEE_ADMISSION_DATE < TO_DATE('2016-01-01', 'YYYY-MM-DD')
SELECT * FROM EMPLOYEE WHERE EMPLOYEE_ADMISSION_DATE >= TO_DATE('2016-01-01', 'YYYY-MM-DD')

Notice that we have mixed partition types, the default partition will be created based only on the range partition.