ScheduledExecution¶
- class hana_ml.algorithms.pal.scheduler.ScheduledExecution(connection_context)¶
Python implementation of PAL scheduled execution. Basically, with an instance of class ScheduledExecution, users can take the following actions:
create a task
create a scheduled execution for a task
alter the scheduled execution for a task
pause the scheduled execution for a task
resume the scheduled execution for a task
remove the scheduled execution for a task
remove a task
create a scheduled execution of the fit() method of a hana-ml object
create a scheduled execution of the predict() method of a hana-ml object
create a scheduled execution of the score() method of a hana-ml object (limited to the case that the score() method is associated with a PAL SCORE procedure)
- Parameters
- connection_contextConnectionContext
Specifies the valid connection to SAP HANA Cloud database.
- Attributes
- connection_contextConnectionContext
Representing the connection to SAP HANA Cloud database.
- current_userstr
Representing the info of CURRENT_USER reflected by the connection to SAP HANA.
Methods
alter_task_schedule(task_id[, cron, ...])Alter a schedule.
cancel_schedule_job(task_id, max_wait_duration)Cancel running scheduled job.
create_fit_schedule(obj, fit_params, ...[, ...])Create a scheduled execution of the fit() method of a hana-ml object.
create_predict_schedule(obj, predict_params, ...)Create a scheduled execution of the predict() method of a hana-ml object.
create_score_schedule(obj, score_params, ...)Create a scheduled execution of the score() method of a hana-ml object (the method must invoke a PAL SCORE procedure internally).
create_task(task_id, proc_name, proc_schema)Create a task to be scheduled for execution.
create_task_schedule(task_id, cron[, ...])Create scheduled execution for a task.
get_executed_task_jobs(task_id[, job_id, order])Retrieving the executed task jobs (from table "PAL_SCHEDULED_EXECUTION"."TASK_SCHEDULE_JOB").
Get the SQL statement for creating the fit procedure that has been scheduled for execution.
Get the SQL statement for creating the predict procedure that has been scheduled for execution.
Get the SQL statement for creating the score procedure that has been scheduled for execution.
get_task_definition(task_id)Get the definition of a created task given task ID.
get_task_log(task_id)Get the log of a specified task given its task_id.
get_task_param(task_id)Get the parameters of a created task given a task_id.
get_task_schedules([task_owner])Get the info of scheduled jobs from system view SCHEDULER_JOBS via task owner specification.
Get the materialization table names of temp tables for the scheduled hana-ml fit() execution.
Get the materialization table names of temp tables for the scheduled hana-ml predict() execution.
Get the materialization table names of temp tables for the scheduled hana-ml score() execution.
Get the output table names for the scheduled hana-ml fit() execution.
Get the output table names for the scheduled hana-ml predict() execution.
Get the output table names for the scheduled hana-ml score() execution.
pause_task_schedule(task_id)Pause a running shedule.
remove_task(task_id[, force])Remove a task.
remove_task_schedule(task_id)Remove the schedule execution of a task.
resume_task_schedule(task_id)Resume a paused schedule.
Examples
Scenario : There is a dataset that has been updated continously. Assuming that the dataset is stored in a table called 'EXPERIMENT_DATA_FULL_TBL' in SAP HANA Cloud database. We want to schedule the training of an HGBT model on this dataset at 8AM each monday, and having the lasted HGBT model stored in a table called 'EXPERIMENT_MODEL_TBL'.
The entire scheduling process of the scenario above can be illustrated as follows:
Firstly we Create a ScheduledExecution instance as follows:
>>> from hana_ml.dataframe import ConnectionContext >>> url, port, user, pwd = 'mocksite.com', 30015, 'MOCK_USER', 'pt&%$sdxy' >>> conn = ConnnectionContext(url, port, user, pwd) >>> sexec = ScheduledExecution(conn) >>> sexec.current_user ... 'MOCK_USER'
Then we can execute the following SQL statement to create a stored SQL procedure for each single training process:
1CREATE PROCEDURE EXPERIMENT_HGBT_TRAIN(TREE_NUM INTEGER) 2LANGUAGE SQLSCRIPT 3SQL SECURITY INVOKER 4AS 5BEGIN 6DECLARE param_tab TABLE("PARAM_NAME" VARCHAR(256), "INT_VALUE" INTEGER, "DOUBLE_VALUE" DOUBLE, "STRING_VALUE" VARCHAR(1000)); 7:param_tab.insert(('HAS_ID', 1, NULL, NULL)); 8:param_tab.insert(('DEPENDENT_VARIABLE', NULL, NULL, 'median_house_value')); 9:param_tab.insert(('ITER_NUM', :TREE_NUM, NULL, NULL)); 10data_tab = SELECT * FROM EXPERIMENT_DATA_FULL_TBL; 11CALL _SYS_AFL.PAL_HGBT(:data_tab, :param_tab, model_tab, varimp_tab, cm_tab, stat_tab, cv_tab); 12TRUNCATE TABLE EXPERIMENT_MODEL_TBL; 13INSERT INTO EXPERIMENT_MODEL_TBL SELECT * FROM :model_tab; 14END
Once created, the procedure will be under the schema of current user (i.e. 'MOCK_USER' shown in the connection). Then, we can create a task for it, demonstrated as follows:
>>> task_info = sexec.create_task(task_id='EXPERIMENT_DATA_HGBT_FIT', ... proc_name='EXPERIMENT_HGBT_TRAIN', ... proc_schema='MOCK_USER', ... task_desc='Fitting HGBT model using EXPERIMENT dataset', ... task_params=[('TREE_NUM', None, 10, 2)] ... force=True)#drop the old task with same task id if exists
The task is suconnessfully created if no error is raised. We can then attached the prescribed schedule mentioned in the beginning of this section to the created task, illustrated as follows:
>>> schedule_info = sexec.create_task_schedule(task_id='EXPERIMENT_DATA_HGBT_FIT', ... cron="* * * 'mon' 8 0 0")#means 8AM each Monday.
If we change our mind and want to postpone the training process to 9AM each Tuesday, then we only need to alter the schedule using a different execution frequency pattern, illustrated as follows:
>>> schedule_info = sexec.alter_task_schedule(task_id='EXPERIMENT_DATA_HGBT_FIT', ... cron="* * * 'tue' 9 0 0")#means 9AM each Tuesday.
We can pause & resume the schedule anytime we want, illustrated as follows:
>>> sexec.pause_task_schedule(task_id='EXPERIMENT_DATA_HGBT_FIT') >>> sexec.remove_task_schedule(task_id='EXPERIMENT_DATA_HGBT_FIT')
If we no longer need the task to be scheduled, we can remove the schedule:
>>> sexec.remove_task_schedule(task_id='EXPERIMENT_DATA_HGBT_FIT')
Finally if the task is no longer needed, we can remove the task:
>>> sexec.remove_task(task_id='EXPERIMENT_DATA_HGBT_FIT')
- create_task(task_id, proc_name, proc_schema, task_owner=None, task_params=None, task_desc='', force=False)¶
Create a task to be scheduled for execution. Basically, a task is consisted of the task ID, the owner and a stored SQL procedure (with parameters) to be invoked.
- Parameters
- task_idstr
Specifies the name of the task to be created. The name must be unique and does not conflict with names of existing tasks.
- proc_namestr
Specifies the name of the stored SQL procedure to be invoked.
- proc_schemastr
Specifies the schema of the stored SQL procedure given in
proc_name.Two simple examples for illustration:
If the stored SQL procedure to be invoked is created by user 'PAL_TESTER', then
proc_schemashould be assigned the value of 'PAL_TESTER'.All PAL procedures are under the schema '_SYS_AFL'. If the stored SQL procedure to be invoked is a PAL procedure, then
proc_schemashould be assigned the value of '_SYS_AFL'.
- task_owerstr, optional
Specifies the task owner, whom must be granted the priviledge to call the stored the SQL procedure specified by
proc_name.Defaults to CURRENT_USER.
- task_paramslist of tuples, optional
Specifies the parameters of the stored SQL procedure, each parameter must be specified with a tuple described as follows:
(parameter name, parameter schema, parameter value, parameter type).
Currently parameter type can take the following values
0 : table
1 : view
2 : literal
Note that if parameter type is literal (i.e. takes the value of 2), then its corresponding parameter schema should be None.
- task_descstr, optional
Description of the task.
Defaults to empty string.
- forcebool, optional
Specifies whether or not to drop the previously created task with the same
task_id.Set as True if you want to drop the old task with the same
task_id. In this case, if the old task is scheduled for execution, the schedule is dropped as well.If set as False, and a task with the same
task_idalready exists, error message shall be thrown.Default to False.
- Returns
- DataFrame
DataFrame containing the informatoin of the created task.
- remove_task(task_id, force=False)¶
Remove a task.
- task_idstr
Specifies the name of the task to be removed.
- forcebool, optional
Specifies whether or not to continue removing the specified task if the task scheduled.
If it is set as True and the task is scheduled, the schedule will be removed as well in order to facilitate the removal of the task (otherwise error will be thrown).
Defaults to False.
- Returns
- DataFrame
DataFrame containing the information of the task that has been removed.
- get_task_log(task_id)¶
Get the log of a specified task given its task_id.
- Parameters
- task_idstr
Task ID.
- get_task_definition(task_id)¶
Get the definition of a created task given task ID.
- Parameters
- task_idstr
Task ID.
- get_task_param(task_id)¶
Get the parameters of a created task given a task_id.
- Parameters
- task_idstr
Task ID.
- create_task_schedule(task_id, cron, recurrence_range=None, force=False)¶
Create scheduled execution for a task.
- Parameters
- task_idstr
Name of the task to be scheduled for execution.
- cronstr
Specifies the frequency pattern of task to be executed. It should be a string of the following format (please note that there is a space between neighboring frequency categories)
"<YEAR> <MONTH> <DATE> <WEEKDAY> <HOUR> <MINUTE> <SECOND>"
where
YEAR
Four digit number, representing the year
MONTH
1 - 12, representing the month
DATE
1 - 31, representing the date (monthday)
WEEKDAY
'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun', representing the day of week
HOUR
0 - 23, representing the hour
MINUTE
0 - 59, representing the minute
SECOND
0 - 59, representing the second
Besides valid values for each frequency category listed above, each frequency pattern also supports wildcard character, range pattern and cycle pattern, illustrated as follows:
*
Any frequency value
*/n
From the first valid value then any other value step n
a:b
Valid values ranging from a to b, inclusive of end points
a:b/n
Valid values from a to b with step n
Moreover, each frequency pattern can also be entered in a comma separated list. For example, the <WEEKDAY> frequency pattern can be specified as 'mon, wed, fri', which means that task is scheduled for execution on Monday, Wednesday and Friday.
Example
cron = "2025 2 25 * 14:16 0 0"
specifies a frequency pattern of hourly task execution from 14:00PM to 16:00PM, Feb 25, 2025.
- recurrence_rangedict, optional
This parameter Specifies the range of time allowed for scheduled task execution. This setting is optional, user can set either the lower bound (i.e. start) or upper bound (i.e. end) of the range, or neither.
For specifying the start or end points of the reconnurence range (or both), user should alway use string timestamp of the format "YYYY-MM-DD HH24:MI:SS.FF7", or a python object of class datetime.datetime.
Example recurrence range in dict : {'start': '2025-02-22 14:00:00.0000000', 'end': '2025-02-28 15:00:00.0000000'}, which specifies a recurrence range from 14PM, Feb 22, 2025 to 15PM, Feb 28, 2025.
- Returns
- DataFrame
DataFrame containing the created schedule for task execution.
- alter_task_schedule(task_id, cron=None, recurrence_range=None)¶
Alter a schedule.
- Parameters
- task_idstr
Name of the task to be scheduled for execution.
- cronstr
Specifies the frequency pattern of task to be executed, it format is the same as the format
cronparameter increate_task_schedule.- recurrence_rangedict, optional
This parameter Specifies the range of time allowed for scheduled task execution. The settings of this parameter is the same as the settings of
recurrence_rangeparameter increate_task_schedule.
- Returns
- DataFrame
DataFrame containing the information of the altered scheduled execution.
- pause_task_schedule(task_id)¶
Pause a running shedule.
- Parameters
- task_idstr
Task ID.
- Returns
- DataFrame
DataFrame containing the information of the (paused) task schedule.
- resume_task_schedule(task_id)¶
Resume a paused schedule.
- Parameters
- task_idstr
Task ID.
- Returns
- DataFrame
DataFrame containing the information of the (resumed) task schedule.
- get_executed_task_jobs(task_id, job_id=None, order='desc')¶
Retrieving the executed task jobs (from table "PAL_SCHEDULED_EXECUTION"."TASK_SCHEDULE_JOB").
- Parameters
- task_idstr
Task ID.
- job_idint, optional
Job ID. Defaults to None.
- order{'asc', 'desc'}, optional
The displaying order of retrieved records in start time of execution.
Defaults to 'desc'.
- Returns
- DataFrame
DataFrame containing the information of the executed task jobs.
- get_task_schedules(task_owner=None)¶
Get the info of scheduled jobs from system view SCHEDULER_JOBS via task owner specification.
- Parameters
- task_ownerstr, optional
Task owner.
Defaults to the value of class attribute current_user.
- Returns
- DataFrame
Filtered view of SCHEDULER_JOBS.
- cancel_schedule_job(task_id, max_wait_duration)¶
Cancel running scheduled job.
- Parameters
- task_idstr
Task ID.
- max_wait_durationint
Maximum wait duration for canceling the schedule job, in seconds.
- Returns
- DataFrame
DataFrame containing result message of the cancel process.
- remove_task_schedule(task_id)¶
Remove the schedule execution of a task.
- Parameters
- task_idstr
Task ID.
- Returns
- DataFrame
DataFrame containing the information of the scheduled task execution.
- create_fit_schedule(obj, fit_params, task_id, cron, recurrence_range=None, output_table_names=None, proc_name=None, force=True)¶
Create a scheduled execution of the fit() method of a hana-ml object. To achieve this designated objective, the following actions will be taken subsequently:
A stored SQL procedure is firstly created for the fit() method to be executed
A task is created for the stored SQL procedure
The task created in Step 2 is scheduled for future execution
- Parameters
- objhana-ml object
A hana-ml object (i.e. an instance of some hana-ml class) that with callable fit() method.
For example,
objcan be a hana-ml object defined as follows:1from hana_ml.algorithms.pal.unified_classfication import UnifiedClassification 2obj = UnifiedClassification(func='HybridGradientBoostingTree', n_estimators=100)
- fit_paramsdict
The key-value arguments (parameters) passed to the fit() method of
obj. Intrinsically it is the execution ofobj.fit(**fit_params)
to be scheduled.
- task_idstr
The task ID for the task associated with the stored SQL procedure associated with the execution of the
fit()method ofobj.- cronstr
Specifies the frequency pattern of task to be executed, which is the same as the definition of
cronin methodcreate_task_schedule.- recurrence_rangedict, optional
This parameter Specifies the range of time allowed for scheduled task execution. It is the same as the definition of
recurrence_rangein methodcreate_task_schedule.- output_table_namesListOfStrings, optional
User Specified names of output tables for the corresponding PAL procedure for model fitting.
If not provided, the table names will be automatically generated.
- proc_namestr, optional
Procedure name of the generated stored SQL procedure.
Defaults to f"PROCEDURE_{task_id}" if not provided.
- forcebool, optional
Specifies whether or not to force the creation of the task schedule for the execution of the fit() method of
obj.It set as True, it will firstly try to drop previously existing procedures with the same name as well as tasks/schedules with the same
task_id, then re-created and re-schedule.Defaults to True.
Examples
Assuming a dataset for classification is stored in table "CLS_DATA_TBL" (with ID column "ID" and label column "CLASS"), we want to schedule the training of an HGBT model using the UnifiedClassficiation interface provided in hana-ml, the we can proceed as follows:
>>> from hana_ml.dataframe import ConnectionContext >>> cc = ConnectionContext(address=..., port=..., user=..., password=...) >>> data = cc.table() >>> fit_params = dict(data=data, key="ID", label="CLASS") >>> scheduler = ScheduledExecution(cc) >>> from hana_ml.algorithms.pal.unified_classification import UnifiedClassficiation >>> uhgc = UnifiedClassficiation(func="HybridGradientBoostingTree", n_estimators=100) >>> schedule_info = scheduler.create_fit_schedule(obj=uhgc, ... fit_params=fit_params, ... task_id="CLS_DATA_TBL_FIT", ... cron="2025 3 14 * 9 0 0", ... force=True)
- get_fit_sql_proc_create_statement()¶
Get the SQL statement for creating the fit procedure that has been scheduled for execution.
- list_materialized_tables_fit()¶
Get the materialization table names of temp tables for the scheduled hana-ml fit() execution.
- list_output_tables_fit()¶
Get the output table names for the scheduled hana-ml fit() execution.
- create_predict_schedule(obj, predict_params, task_id, cron, proc_name=None, recurrence_range=None, output_table_names=None, force=True)¶
Create a scheduled execution of the predict() method of a hana-ml object. A prerequisite step is to execute the fit() method of the hana-ml object first, so that a model is available for making inferences.
Then, to achieve this designated objective, the following actions will be taken subsequently:
A stored SQL procedure is firstly created for the predict() method to be executed
A task is created for the stored SQL procedure
The task created in Step 2 is scheduled for future execution
- Parameters
- objhana-ml object
A hana-ml object (i.e. an instance of some hana-ml class) with a callable predict() method. It needs to be fitted firstly.
For example,
objcan be a hana-ml object defined as follows:1from hana_ml.algorithms.pal.unified_classfication import UnifiedClassification 2obj = UnifiedClassification(func='HybridGradientBoostingTree', n_estimators=100).fit(data=data, key=..., label=...)
- predict_paramsdict
The key-value arguments (parameters) passed to the predict() method of
obj. Intrinsically it is the execution ofobj.predict(**predict_params)
to be scheduled.
- task_idstr
The task ID for the task associated with the stored SQL procedure associated with the execution of the
predict()method ofobj.- cronstr
Specifies the frequency pattern of task to be executed, which is the same as the definition of
cronin methodcreate_task_schedule.- recurrence_rangedict, optional
This parameter Specifies the range of time allowed for scheduled task execution. It is the same as the definition of
recurrence_rangein methodcreate_task_schedule.- output_table_namesListOfStrings, optional
User Specified names of output tables for the corresponding PAL procedure for model fitting.
If not provided, the table names will be automatically generated.
- proc_namestr, optional
Procedure name of the generated stored SQL procedure.
Defaults to f"PROCEDURE_{task_id}" if not provided.
- forcebool, optional
Specifies whether or not to force the creation of the task schedule for the execution of the predict() method of
obj.It set as True, it will firstly try to drop previously existing procedures with the same name as well as tasks/schedules with the same
task_id, then re-created and re-schedule.Defaults to True.
Examples
Assuming training dataset for classification is stored in table "CLS_DATA_TBL_TRAIN", and a sepearate data for prediction is stored in table "CLS_DATA_TBL_PREDICT". we want to schedule the prediction of an HGBT model in UnifiedClassficiation interface for the prediction dataset. Then, we can proceed as follows:
>>> from hana_ml.dataframe import ConnectionContext >>> cc = ConnectionContext(address=..., port=..., user=..., password=...) >>> scheduler = ScheduledExecution(cc) >>> train_data = cc.table("CLS_DATA_TBL_TRAIN") >>> from hana_ml.algorithms.pal.unified_classification import UnifiedClassficiation >>> uhgc = UnifiedClassficiation(func="HybridGradientBoostingTree", ... n_estimators=100).fit(data=train_data, key=...) >>> predict_data = cc.table("CLS_DATA_TBL_PREDICT") >>> predict_params = dict(data=predict_data, key=...) >>> schedule_info = scheduler.create_predict_schedule(obj=uhgc, ... predict_params=predict_params, ... task_id="CLS_DATA_TBL_PREDICT", ... cron="2025 3 14 * 9 0 0",#means 9:00 AM, March 14, 2025 ... force=True)
- get_predict_sql_proc_create_statement()¶
Get the SQL statement for creating the predict procedure that has been scheduled for execution.
- list_materialized_tables_predict()¶
Get the materialization table names of temp tables for the scheduled hana-ml predict() execution.
- list_output_tables_predict()¶
Get the output table names for the scheduled hana-ml predict() execution.
- create_score_schedule(obj, score_params, task_id, cron, proc_name=None, recurrence_range=None, output_table_names=None, force=True)¶
Create a scheduled execution of the score() method of a hana-ml object (the method must invoke a PAL SCORE procedure internally). A prerequisite step is to execute the fit() method of the hana-ml object first, so that a model is available for scoring on test data.
Then, to achieve this designated objective, the following actions will be taken subsequently:
A stored SQL procedure is firstly created for the score() method to be executed
A task is created for the stored SQL procedure
The task created in Step 2 is scheduled for future execution
- Parameters
- objhana-ml object
A hana-ml object (i.e. an instance of some hana-ml class) with a callable score() method which can invoke the execution of a PAL SCORE procedure. It needs to be fitted firstly.
For example,
objcan be a hana-ml object defined as follows:1from hana_ml.algorithms.pal.unified_classfication import UnifiedClassification 2obj = UnifiedClassification(func='HybridGradientBoostingTree', n_estimators=100).fit(data=data, key=..., label=...)
- score_paramsdict
The key-value arguments (parameters) passed to the score() method of
obj. Intrinsically it is the execution ofobj.score(**score_params)
to be scheduled.
- task_idstr
The task ID for the task associated with the stored SQL procedure associated with the execution of the
score()method ofobj.- cronstr
Specifies the frequency pattern of task to be executed, which is the same as the definition of
cronin methodcreate_task_schedule.- proc_namestr, optional
Procedure name of the generated stored SQL procedure.
Defaults to f"PROCEDURE_{task_id}" if not provided.
- recurrence_rangedict, optional
This parameter Specifies the range of time allowed for scheduled task execution. It is the same as the definition of
recurrence_rangeincreate_task_schedule.- output_table_namesListOfStrings, optional
User specified names of output tables for the corresponding PAL procedure for model fitting.
If not provided, the table names will be automatically generated.
- forcebool, optional
Specifies whether or not to force the creation of the task schedule for the execution of the score method of
obj.It set as True, it will firstly try to drop previously existing procedures with the same name as well as tasks/schedules with the same
task_id, then re-created and re-schedule.Defaults to True.
Examples
Assuming a dataset for classification is split into train and test parts, stored separately in table "CLS_DATA_TBL_TRAIN" and table "CLS_DATA_TBL_TEST", we want to schedule the training of an HGBT model using the UnifiedClassficiation interface provided in hana-ml, the we can proceed as follows:
>>> from hana_ml.dataframe import ConnectionContext >>> cc = ConnectionContext(address=..., port=..., user=..., password=...) >>> scheduler = ScheduledExecution(cc) >>> from hana_ml.algorithms.pal.unified_classification import UnifiedClassficiation >>> uhgc = UnifiedClassficiation(func="HybridGradientBoostingTree", ... n_estimators=100) >>> train_data = cc.table("CLS_DATA_TBL_TRAIN") >>> uhgc.fit(data=train_data, key=...)#fit the train data firstly to generated a model for inference task >>> test_data = cc.table("CLS_DATA_TBL_TEST") >>> score_params = dict(data=test_data, key=..., label=...) >>> schedule_info = scheduler.create_score_schedule(obj=uhgc, ... score_params=score_params, ... task_id="CLS_DATA_TBL_SCORE", ... cron="2025 3 14 * 9 0 0",#means 9:00 AM, March 14, 2025 ... force=True)
- get_score_sql_proc_create_statement()¶
Get the SQL statement for creating the score procedure that has been scheduled for execution.
- list_materialized_tables_score()¶
Get the materialization table names of temp tables for the scheduled hana-ml score() execution.
- list_output_tables_score()¶
Get the output table names for the scheduled hana-ml score() execution.