gen_ai_hub.orchestration_v2 package
Subpackages
Submodules
gen_ai_hub.orchestration_v2.exceptions module
Exceptions for the orchestration service module.
- exception OrchestrationError
Bases:
ExceptionThis exception is raised when an error occurs during the execution of the orchestration service, typically due to incorrect usage, invalid configurations, or issues with run parameters defined by the user.
- __init__(request_id, headers, message, code, location, intermediate_results, retries=0)
Initializes the OrchestrationError with detailed context.
- Parameters:
request_id (str) -- unique identifier for the request that encountered the error.
headers (httpx.Headers) -- HTTP headers associated with the request, useful in case of e.g. rate limiting..
message (str) -- Detailed error message describing the issue.
code (int) -- Error code associated with the specific type of failure.
location (str) -- Specific component or step in the orchestration process where the error occurred.
intermediate_results (ModuleResults) -- State information and partial results from various modules at the time of the error, useful for debugging.
retries (int, optional) -- Number of retries attempted before the error was raised.
errors (Optional[list[dict[str, Any]]]) -- Raw error payload(s) from the API. Can contain multiple errors.
- exception OrchestrationErrorList
Bases:
Exception- __init__(errors)
- Parameters:
errors (list[OrchestrationError])
gen_ai_hub.orchestration_v2.service module
Module for orchestration service handling requests and responses.
Provides synchronous and asynchronous methods to run orchestration pipelines.
- class OrchestrationService
Bases:
objectA service for executing orchestration requests, allowing for the generation of LLM-generated content through a pipeline of configured modules.
This service supports both synchronous and asynchronous request execution. For streaming responses, special care is taken to not close the underlying HTTP stream prematurely.
See https://api.sap.com/api/ORCHESTRATION_API_v2/overview
Args:
api_url: The base URL for the orchestration API.
config: The default orchestration configuration.
config_ref: The reference to default orchestration configuration.
proxy_client: A GenAIHubProxyClient instance.
deployment_id: Optional deployment ID.
config_name: Optional configuration name.
config_id: Optional configuration ID.
timeout: Optional timeout for HTTP requests.
- __init__(api_url=None, config=None, config_ref=None, proxy_client=None, deployment_id=None, config_name=None, config_id=None, timeout=None)
Initializes the OrchestrationService.
- Parameters:
api_url (Optional[str], optional) -- the base URL for the orchestration API, defaults to None
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None
proxy_client (Optional[GenAIHubProxyClient], optional) -- the GenAIHubProxyClient instance, defaults to None
deployment_id (Optional[str], optional) -- the deployment ID, defaults to None
config_name (Optional[str], optional) -- the configuration name, defaults to None
config_id (Optional[str], optional) -- the configuration ID, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout for HTTP requests, defaults to None
- Raises:
ValueError -- if both config and config_ref are provided.
- async aclose_http_connection()
Closes the httpx asynchronous client.
- async aembed(config, input, timeout=None)
Executes an embeddings request asynchronously.
- Parameters:
config (EmbeddingsOrchestrationConfig) -- the embeddings orchestration configuration
input (EmbeddingsInput) -- the input text to embed
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
the EmbeddingsPostResponse object
- Return type:
- async arun(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)
Executes an orchestration request asynchronously (non-streaming).
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
the CompletionPostResponse object
- Return type:
- async arun_with_retries(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)
Executes an orchestration request asynchronously with automatic retry on rate limits (429) and server errors. Uses exponential backoff with jitter to handle rate limiting gracefully.
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10
base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0
- Returns:
the OrchestrationResponseWithRetries with retry count information
- Return type:
- Raises:
ValueError -- if no configuration is provided.
OrchestrationError -- if request fails after all retries (includes retry count).
- async astream(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)
Executes an orchestration streaming request asynchronously.
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
the AsyncSSEClient object
- Return type:
- close_http_connection()
Closes the httpx synchronous client.
- embed(config, input, timeout=None)
Executes an embeddings request synchronously.
- Parameters:
config (EmbeddingsOrchestrationConfig) -- the embeddings orchestration configuration
input (EmbeddingsInput) -- the input text to embed
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
the EmbeddingsPostResponse object
- Return type:
- handle_retry(retry_count, base_delay, error, max_retries)
Handles retry logic with exponential backoff and jitter. If Retry-After header exists, use it as min_delay to add jitter on top
- Parameters:
retry_count (int) -- the incremented retry attempt number
base_delay (float) -- the initial delay between retries in seconds
error (OrchestrationError) -- the exception that occurred
max_retries (int) -- the maximum number of retry attempts
- Raises:
error -- throws the original error if no retry should be attempted
- Returns:
the number of seconds to wait before next retry
- Return type:
float
- run(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)
Executes an orchestration request synchronously (non-streaming).
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None if not provided, the default configuration is used.
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
the CompletionPostResponse object
- Return type:
- run_with_retries(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None, max_retries=10, base_delay=1.0)
Executes an orchestration request with automatic retry on rate limits (429) and server errors.
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional) -- the orchestration configuration reference, defaults to None
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
max_retries (int, optional) -- the maximum number of retry attempts, defaults to 10
base_delay (float, optional) -- the initial delay between retries in seconds, defaults to 1.0
- Returns:
the OrchestrationResponseWithRetries with retry count information
- Return type:
- Raises:
ValueError -- if no configuration is provided.
OrchestrationError -- if request fails after all retries (includes retry count).
- stream(config=None, config_ref=None, placeholder_values=None, history=None, timeout=None)
Executes an orchestration streaming request synchronously.
- Parameters:
config (Optional[OrchestrationConfig], optional) -- the orchestration configuration, defaults to None
config_ref (Optional[OrchestrationConfigReference], optional if not provided, the default configuration is used.) -- the orchestration configuration reference, defaults to None
placeholder_values (Optional[dict], optional) -- the template values, defaults to None
history (Optional[List[ChatMessage]], optional) -- the message history, defaults to None
timeout (Union[int, float, httpx.Timeout, None], optional) -- the timeout overwrite per request, defaults to None
- Returns:
An Iterable[StreamCompletionPostResponse] object
- Return type:
Iterable[StreamCompletionPostResponse]
- cache_if_not_none(func)
Custom cache decorator that only caches non-None results
- discover_orchestration_api_url(base_url, auth_url, client_id, client_secret, resource_group, config_id=None, config_name=None, orchestration_scenario='orchestration', executable_id='orchestration')
Discovers the orchestration API URL based on provided configuration details.
- Parameters:
base_url (str) -- the base URL for the AI Core API.
auth_url (str) -- the URL for the AI Core authentication service.
client_id (str) -- the client ID for the AI Core API.
client_secret (str) -- the client secret for the AI Core API.
resource_group (str) -- the resource group for the AI Core API.
config_id (Optional[str], optional) -- the configuration ID, defaults to None
config_name (Optional[str], optional) -- the configuration name, defaults to None
orchestration_scenario (str, optional) -- the orchestration scenario ID, defaults to "orchestration"
executable_id (str, optional) -- the orchestration executable ID, defaults to "orchestration"
- Returns:
the orchestration API URL or None if no deployment is found.
- Return type:
Optional[str]
- get_orchestration_api_url(proxy_client, deployment_id=None, config_name=None, config_id=None)
Retrieves the orchestration API URL based on provided deployment or configuration details.
- Parameters:
proxy_client (GenAIHubProxyClient) -- the GenAIHubProxyClient instance.
deployment_id (Optional[str], optional) -- the deployment ID, defaults to None
config_name (Optional[str], optional) -- the configuration name, defaults to None
config_id (Optional[str], optional) -- the configuration ID, defaults to None
- Raises:
ValueError -- throws if no orchestration deployment is found.
- Returns:
the orchestration API URL.
- Return type:
str
gen_ai_hub.orchestration_v2.sse_client module
Module for Server-Sent Events (SSE) clients for orchestration responses.
This module provides both synchronous and asynchronous SSE clients for iterating over streaming responses. Each client is responsible for handling HTTP errors and for closing the underlying HTTP stream when iteration is complete.
- class AsyncSSEClient
Bases:
objectAn asynchronous SSE client for iterating over streaming responses.
This client wraps an asynchronous HTTP stream (provided as a context manager) and ensures that the stream is properly opened and closed. It also checks for HTTP errors upon entering the stream.
- __init__(response_cm, prefix='data: ', final_message='[DONE]')
Initializes the AsyncSSEClient.
- Parameters:
response_cm (AsyncContextManager[httpx.Response]) -- An asynchronous context manager for the HTTP streaming response.
prefix (str, optional) -- the SSE data prefix, defaults to "data: "
final_message (str, optional) -- the message indicating the end of the stream, defaults to "[DONE]"
- class SSEClient
Bases:
objectA synchronous Server-Sent Events (SSE) client that wraps an httpx.Response for iterating over streaming responses.
This client reads data chunks from the HTTP stream and parses each SSE event. For performance reasons the underlying HTTP stream is reused for subsequent calls.
- __init__(response_cm, prefix='data: ', final_message='[DONE]')
Initializes the SSEClient.
- Parameters:
response_cm (httpx.Response) -- An httpx.Response context manager for the streaming response.
prefix (str, optional) -- The prefix string that identifies SSE event data, defaults to data:
final_message (str, optional) -- The message that indicates the end of the stream, defaults to [DONE]
- iter_lines()
Reads data chunks from the HTTP stream and yields complete lines.
This method accumulates incoming chunks until a newline is encountered, yielding one complete line at a time.
- Returns:
Complete lines of text from the streaming response.
- Return type:
Iterable[str]
- Yield:
Complete lines of text from the streaming response.
- Return type:
Iterator[Iterable[str]]
gen_ai_hub.orchestration_v2.utils module
- load_text_file(file_path)
Loads and returns the content of a text file.
- Parameters:
file_path (str) -- The path to the text file to be loaded.
- Returns:
The content of the file as a string.
- Return type:
str