gen_ai_hub.orchestration package

Subpackages

Submodules

gen_ai_hub.orchestration.exceptions module

exception OrchestrationError

Bases: Exception

This exception is raised when an error occurs during the execution of the orchestration service, typically due to incorrect usage, invalid configurations, or issues with run parameters defined by the user.

__init__(request_id, http_headers, message, code, location, module_results, retries=0)

The constructor for OrchestrationError class.

Parameters:
  • request_id (str) -- unique identifier for the request

  • http_headers (httpx.Headers) -- the HTTP headers associated with the error, useful in case of e.g. rate limiting.

  • message (str) -- Detailed error message describing the issue.

  • code (int) -- Error code associated with the specific type of failure

  • location (str) -- Specific component or step in the orchestration process where the error occurred

  • module_results (Dict[str, Any]) -- State information and partial results from various modules at the time of the error, useful for debugging

  • retries (int, optional) -- the number of retries attempted

gen_ai_hub.orchestration.service module

Module for orchestration service handling requests and responses.

Provides synchronous and asynchronous methods to run orchestration pipelines.

class OrchestrationRequest

Bases: JSONSerializable

Represents a request for the orchestration process, including configuration, template values, and message history.

__init__(config, template_values, history)
Parameters:
Return type:

None

to_dict()

Converts the OrchestrationRequest instance to a dictionary.

Returns:

Dictionary representation of the OrchestrationRequest

Return type:

dict

config: OrchestrationConfig

The orchestration configuration for the request.

Returns:

OrchestrationConfig

Return type:

OrchestrationConfig

history: List[Message]

List of messages representing the conversation history.

template_values: List[TemplateValue]

List of template values to be used in the orchestration.

class OrchestrationService

Bases: object

A service for executing orchestration requests, allowing for the generation of LLM-generated content through a pipeline of configured modules. This service supports both synchronous and asynchronous request execution. For streaming responses, special care is taken to not close the underlying HTTP stream prematurely.

https://api.sap.com/api/ORCHESTRATION_API/overview

__init__(api_url=None, config=None, proxy_client=None, deployment_id=None, config_name=None, config_id=None, timeout=None)

Initializes the OrchestrationService with the provided parameters.

Parameters:
  • api_url (Optional[str], optional) -- The base URL for the orchestration API, defaults to None

  • config (Optional[OrchestrationConfig], optional) -- The default orchestration configuration, defaults to None

  • proxy_client (Optional[GenAIHubProxyClient], optional) -- The GenAIHubProxyClient instance, defaults to None

  • deployment_id (Optional[str], optional) -- the deployment ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for HTTP requests, defaults to None

async aclose_http_connection()

Closes the httpx asynchronous client.

async arun(config=None, template_values=None, history=None, timeout=None)

Executes an orchestration request asynchronously (non-streaming).

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

Returns:

An OrchestrationResponse object.

Return type:

OrchestrationResponse

async arun_with_retries(config=None, template_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)

Executes an orchestration request asynchronously with automatic retry on rate limits (429) and server errors. Uses exponential backoff with jitter to handle rate limiting gracefully.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

  • max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10

  • base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0

Returns:

An OrchestrationResponseWithRetries with retry count information.

Return type:

OrchestrationResponseWithRetries | None

Raises:
  • OrchestrationError -- If the request fails after all retries (includes retry count).

  • ValueError -- If no configuration is provided.

async astream(config=None, template_values=None, history=None, stream_options=None, timeout=None)

Executes an orchestration request asynchronously in streaming mode.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • stream_options (Optional[dict], optional) -- the additional streaming options, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

Returns:

An AsyncSSEClient instance for iterating over the streaming response.

Return type:

AsyncSSEClient

close_http_connection()

Closes the httpx synchronous client.

handle_retry(retry_count, base_delay, error, max_retries)

Handles retry logic with exponential backoff and jitter. If Retry-After header exists, use it as min_delay to add jitter on top

Parameters:
  • retry_count (int) -- the current retry attempt number

  • base_delay (float) -- the initial delay between retries in seconds

  • error (OrchestrationError) -- the exception that occurred

  • max_retries (int) -- the maximum number of retry attempts

Raises:

error -- Raises the original error if no more retries should be attempted

Returns:

number of seconds to wait before next retry

Return type:

float

run(config=None, template_values=None, history=None, timeout=None)

Executes an orchestration request synchronously (non-streaming).

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

Returns:

An OrchestrationResponse object.

Return type:

OrchestrationResponse

run_with_retries(config=None, template_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)

Executes an orchestration request with automatic retry on rate limits (429) and server errors.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

  • max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10

  • base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0

Returns:

An OrchestrationResponseWithRetries with retry count information.

Return type:

OrchestrationResponseWithRetries | None

Raises:
  • OrchestrationError -- If the request fails after all retries (includes retry count).

  • ValueError -- If no configuration is provided.

stream(config=None, template_values=None, history=None, stream_options=None, timeout=None)

Executes an orchestration request in streaming mode (synchronously).

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • template_values (Optional[List[TemplateValue]], optional) -- the template values for the request, defaults to None

  • history (Optional[List[Message]], optional) -- the message history, defaults to None

  • stream_options (Optional[dict], optional) -- the additional streaming options, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for the request, defaults to None

Returns:

An SSEClient instance for iterating over the streaming response.

Return type:

SSEClient

cache_if_not_none(func)

Custom cache decorator that only caches non-None results

Parameters:

func (callable) -- The function to be decorated.

Returns:

The decorated function with caching behavior.

Return type:

callable

discover_orchestration_api_url(base_url, auth_url, client_id, client_secret, resource_group, config_id=None, config_name=None, orchestration_scenario='orchestration', executable_id='orchestration')

Discovers the orchestration API URL based on provided configuration details.

Parameters:
  • base_url (str) -- the base URL for the AI Core API.

  • auth_url (str) -- the URL for the AI Core authentication service.

  • client_id (str) -- the client ID for the AI Core API.

  • client_secret (str) -- the client secret for the AI Core API.

  • resource_group (str) -- the resource group for the AI Core API.

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • orchestration_scenario (str, optional) -- the orchestration scenario ID, defaults to "orchestration"

  • executable_id (str, optional) -- the orchestration executable ID, defaults to "orchestration"

Returns:

The orchestration API URL or None if no deployment is found.

Return type:

Optional[str]

get_orchestration_api_url(proxy_client, deployment_id=None, config_name=None, config_id=None)

Retrieves the orchestration API URL based on provided deployment or configuration details.

Parameters:
  • proxy_client (GenAIHubProxyClient) -- The GenAIHubProxyClient instance.

  • deployment_id (Optional[str], optional) -- the deployment ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

Raises:

ValueError -- If no orchestration deployment is found.

Returns:

The orchestration API URL.

Return type:

str

gen_ai_hub.orchestration.sse_client module

Module for Server-Sent Events (SSE) clients for orchestration responses.

This module provides both synchronous and asynchronous SSE clients for iterating over streaming responses. Each client is responsible for handling HTTP errors and for closing the underlying HTTP stream when iteration is complete.

class AsyncSSEClient

Bases: object

An asynchronous SSE client for iterating over streaming responses.

This client wraps an asynchronous HTTP stream (provided as a context manager) and ensures that the stream is properly opened and closed. It also checks for HTTP errors upon entering the stream.

__init__(response_cm, prefix='data: ', final_message='[DONE]')

Initializes the AsyncSSEClient.

Parameters:
  • response_cm (the type of an async context manager returning httpx.Response) -- An asynchronous context manager for the HTTP streaming response.

  • prefix (str, optional) -- The SSE data prefix, defaults to "data: "

  • final_message (str, optional) -- The message indicating the end of the stream, defaults to "[DONE]"

class SSEClient

Bases: object

A synchronous Server-Sent Events (SSE) client that wraps an httpx.Response for iterating over streaming responses.

This client reads data chunks from the HTTP stream and parses each SSE event. For performance reasons the underlying HTTP stream is reused for subsequent calls.

__init__(response_cm, prefix='data: ', final_message='[DONE]')

Initializes the SSEClient.

Parameters:
  • response_cm (httpx.Response) -- An httpx.Response context manager for the streaming response.

  • prefix (str, optional) -- The prefix string that identifies SSE event data, defaults to "data: "

  • final_message (str, optional) -- The message that indicates the end of the stream, defaults to "[DONE]"

iter_lines()

Reads data chunks from the HTTP stream and yields complete lines.

This method accumulates incoming chunks until a newline is encountered, yielding one complete line at a time.

yield: Complete lines of text from the streaming response.

Return type:

Iterable[str]

gen_ai_hub.orchestration.utils module

load_text_file(file_path)

Loads and returns the content of a text file.

Parameters:

file_path (str) -- The path to the text file to be loaded.

Returns:

The content of the file as a string.

Return type:

str