gen_ai_hub.orchestration_v2 package

Subpackages

Submodules

gen_ai_hub.orchestration_v2.exceptions module

Exceptions for the orchestration service module.

exception OrchestrationError

Bases: Exception

This exception is raised when an error occurs during the execution of the orchestration service, typically due to incorrect usage, invalid configurations, or issues with run parameters defined by the user.

__init__(request_id, headers, message, code, location, intermediate_results, retries=0)

Initializes the OrchestrationError with detailed context.

Parameters:
  • request_id (str) -- unique identifier for the request that encountered the error.

  • headers (httpx.Headers) -- HTTP headers associated with the request, useful in case of e.g. rate limiting..

  • message (str) -- Detailed error message describing the issue.

  • code (int) -- Error code associated with the specific type of failure.

  • location (str) -- Specific component or step in the orchestration process where the error occurred.

  • intermediate_results (ModuleResults) -- State information and partial results from various modules at the time of the error, useful for debugging.

  • retries (int, optional) -- Number of retries attempted before the error was raised.

  • errors (Optional[list[dict[str, Any]]]) -- Raw error payload(s) from the API. Can contain multiple errors.

exception OrchestrationErrorList

Bases: Exception

__init__(errors)
Parameters:

errors (list[OrchestrationError])

gen_ai_hub.orchestration_v2.service module

Module for orchestration service handling requests and responses.

Provides synchronous and asynchronous methods to run orchestration pipelines.

class OrchestrationService

Bases: object

A service for executing orchestration requests, allowing for the generation of LLM-generated content through a pipeline of configured modules.

This service supports both synchronous and asynchronous request execution. For streaming responses, special care is taken to not close the underlying HTTP stream prematurely.

See https://api.sap.com/api/ORCHESTRATION_API_v2/overview

Args:

api_url: The base URL for the orchestration API.

config: The default orchestration configuration.

config_ref: The reference to default orchestration configuration.

proxy_client: A GenAIHubProxyClient instance.

deployment_id: Optional deployment ID.

config_name: Optional configuration name.

config_id: Optional configuration ID.

timeout: Optional timeout for HTTP requests.

__init__(api_url=None, config=None, config_ref=None, proxy_client=None, deployment_id=None, config_name=None, config_id=None, timeout=None)

Initializes the OrchestrationService.

Parameters:
  • api_url (Optional[str], optional) -- the base URL for the orchestration API, defaults to None

  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None

  • proxy_client (Optional[GenAIHubProxyClient], optional) -- the GenAIHubProxyClient instance, defaults to None

  • deployment_id (Optional[str], optional) -- the deployment ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for HTTP requests, defaults to None

Raises:

ValueError -- if both config and config_ref are provided.

async aclose_http_connection()

Closes the httpx asynchronous client.

async aembed(config, input, timeout=None)

Executes an embeddings request asynchronously.

Parameters:
  • config (EmbeddingsOrchestrationConfig) -- the embeddings orchestration configuration

  • input (EmbeddingsInput) -- the input text to embed

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

the EmbeddingsPostResponse object

Return type:

EmbeddingsPostResponse

async arun(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)

Executes an orchestration request asynchronously (non-streaming).

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

the CompletionPostResponse object

Return type:

CompletionPostResponse

async arun_with_retries(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)

Executes an orchestration request asynchronously with automatic retry on rate limits (429) and server errors. Uses exponential backoff with jitter to handle rate limiting gracefully.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

  • max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10

  • base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0

Returns:

the OrchestrationResponseWithRetries with retry count information

Return type:

OrchestrationResponseWithRetries | None

Raises:
  • ValueError -- if no configuration is provided.

  • OrchestrationError -- if request fails after all retries (includes retry count).

async astream(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)

Executes an orchestration streaming request asynchronously.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

the AsyncSSEClient object

Return type:

AsyncSSEClient

close_http_connection()

Closes the httpx synchronous client.

embed(config, input, timeout=None)

Executes an embeddings request synchronously.

Parameters:
  • config (EmbeddingsOrchestrationConfig) -- the embeddings orchestration configuration

  • input (EmbeddingsInput) -- the input text to embed

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

the EmbeddingsPostResponse object

Return type:

EmbeddingsPostResponse

handle_retry(retry_count, base_delay, error, max_retries)

Handles retry logic with exponential backoff and jitter. If Retry-After header exists, use it as min_delay to add jitter on top

Parameters:
  • retry_count (int) -- the incremented retry attempt number

  • base_delay (float) -- the initial delay between retries in seconds

  • error (OrchestrationError) -- the exception that occurred

  • max_retries (int) -- the maximum number of retry attempts

Raises:

error -- throws the original error if no retry should be attempted

Returns:

the number of seconds to wait before next retry

Return type:

float

run(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)

Executes an orchestration request synchronously (non-streaming).

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None if not provided, the default configuration is used.

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

the CompletionPostResponse object

Return type:

CompletionPostResponse

run_with_retries(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)

Executes an orchestration request with automatic retry on rate limits (429) and server errors.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

  • max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10

  • base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0

Returns:

the OrchestrationResponseWithRetries with retry count information

Return type:

OrchestrationResponseWithRetries | None

Raises:
  • ValueError -- if no configuration is provided.

  • OrchestrationError -- if request fails after all retries (includes retry count).

stream(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)

Executes an orchestration streaming request synchronously.

Parameters:
  • config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None

  • config_ref (Optional[OrchestrationConfigReference], optional if not provided, the default configuration is used.) -- the orchestration configuration reference, defaults to None

  • placeholder_values (Optional[dict], optional) -- the template values, defaults to None

  • history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None

  • timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None

Returns:

An Iterable[StreamCompletionPostResponse] object

Return type:

Iterable[StreamCompletionPostResponse]

cache_if_not_none(func)

Custom cache decorator that only caches non-None results

discover_orchestration_api_url(base_url, auth_url, client_id, client_secret, resource_group, config_id=None, config_name=None, orchestration_scenario='orchestration', executable_id='orchestration')

Discovers the orchestration API URL based on provided configuration details.

Parameters:
  • base_url (str) -- the base URL for the AI Core API.

  • auth_url (str) -- the URL for the AI Core authentication service.

  • client_id (str) -- the client ID for the AI Core API.

  • client_secret (str) -- the client secret for the AI Core API.

  • resource_group (str) -- the resource group for the AI Core API.

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • orchestration_scenario (str, optional) -- the orchestration scenario ID, defaults to "orchestration"

  • executable_id (str, optional) -- the orchestration executable ID, defaults to "orchestration"

Returns:

the orchestration API URL or None if no deployment is found.

Return type:

Optional[str]

get_orchestration_api_url(proxy_client, deployment_id=None, config_name=None, config_id=None)

Retrieves the orchestration API URL based on provided deployment or configuration details.

Parameters:
  • proxy_client (GenAIHubProxyClient) -- the GenAIHubProxyClient instance.

  • deployment_id (Optional[str], optional) -- the deployment ID, defaults to None

  • config_name (Optional[str], optional) -- the configuration name, defaults to None

  • config_id (Optional[str], optional) -- the configuration ID, defaults to None

Raises:

ValueError -- throws if no orchestration deployment is found.

Returns:

the orchestration API URL.

Return type:

str

gen_ai_hub.orchestration_v2.sse_client module

Module for Server-Sent Events (SSE) clients for orchestration responses.

This module provides both synchronous and asynchronous SSE clients for iterating over streaming responses. Each client is responsible for handling HTTP errors and for closing the underlying HTTP stream when iteration is complete.

class AsyncSSEClient

Bases: object

An asynchronous SSE client for iterating over streaming responses.

This client wraps an asynchronous HTTP stream (provided as a context manager) and ensures that the stream is properly opened and closed. It also checks for HTTP errors upon entering the stream.

__init__(response_cm, prefix='data: ', final_message='[DONE]')

Initializes the AsyncSSEClient.

Parameters:
  • response_cm (AsyncContextManager[httpx.Response]) -- An asynchronous context manager for the HTTP streaming response.

  • prefix (str, optional) -- the SSE data prefix, defaults to "data: "

  • final_message (str, optional) -- the message indicating the end of the stream, defaults to "[DONE]"

class SSEClient

Bases: object

A synchronous Server-Sent Events (SSE) client that wraps an httpx.Response for iterating over streaming responses.

This client reads data chunks from the HTTP stream and parses each SSE event. For performance reasons the underlying HTTP stream is reused for subsequent calls.

__init__(response_cm, prefix='data: ', final_message='[DONE]')

Initializes the SSEClient.

Parameters:
  • response_cm (httpx.Response) -- An httpx.Response context manager for the streaming response.

  • prefix (str, optional) -- The prefix string that identifies SSE event data, defaults to data:

  • final_message (str, optional) -- The message that indicates the end of the stream, defaults to [DONE]

iter_lines()

Reads data chunks from the HTTP stream and yields complete lines.

This method accumulates incoming chunks until a newline is encountered, yielding one complete line at a time.

Returns:

Complete lines of text from the streaming response.

Return type:

Iterable[str]

Yield:

Complete lines of text from the streaming response.

Return type:

Iterator[Iterable[str]]

gen_ai_hub.orchestration_v2.utils module

load_text_file(file_path)

Loads and returns the content of a text file.

Parameters:

file_path (str) -- The path to the text file to be loaded.

Returns:

The content of the file as a string.

Return type:

str