Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.sudoiq.com/llms.txt

Use this file to discover all available pages before exploring further.

The PyPI distribution is agentserviceapi. The implementation package you import is agentsapi. The client class is AgentServiceAPIClient (from agentsapi import AgentServiceAPIClient). After pip install agentserviceapi you can also import the thin alias module agentserviceapi (same public symbols). Prefer from agentsapi import ... in application code.

Install

python -m venv .venv
source .venv/bin/activate
pip install agentserviceapi
Editable install from a monorepo:
pip install -e /path/to/agentsapi-python
The agentservice CLI bundles agentsapi as well. See CLI.

Authentication

from agentsapi import AgentServiceAPIClient

client = AgentServiceAPIClient(base_url="https://sudoiq.com")
client.set_api_key("sk_...")
Keys are created in Sudoiq settings.

Client API reference

The following lists public methods and namespaces on AgentServiceAPIClient. All HTTP-backed methods are async. Response types are Pydantic models from agentsapi.models (or nested packages such as agentsapi.models.tools) unless noted as plain dict or list.

Constructor and session

MemberDescription
AgentServiceAPIClient(base_url="https://sudoiq.com", timeout=30.0)Base URL for the API and default httpx timeout (seconds) for non-streaming requests.
set_api_key(api_key)Authenticate with an API key (sk_...). Tokens refresh as needed.
set_auth_cookie(custom_session)Set a custom_session cookie for requests (non-key flows).
set_tools(tools, *, replace=True)Register local tool handlers for interactive runs: map name to (ToolDefinition or None, async_callable). The callable signature is (args: dict, agent_id: str, task_id: int) -> str. See SSE and local tools. This is separate from client.tools.register_tools (server-side schemas).

Run workflows

MethodReturnsPurpose
await execute_agent_polling(...)ExecuteAgentResponsePOST execute: returns immediately with task_id (no SSE, no local tool loop). Poll with get_agent_task_status or webhooks.
await execute_agent(...)AgentInteractiveRunSame start as polling, then consumes SSE and runs set_tools handlers when the run pauses for tools.
await run_agent(...)AgentGraphRunStatusResponseConvenience: await execute_agent(...) then await run (single call for a finished interactive run).
await upload_run_input_file(...)UploadRunInputResponseUpload one or two files to run-input storage; put returned references in string_inputs (for example "1", "2") when executing.
await execute_agent_with_files(...)ExecuteAgentResponseUpload plus execute in one step; file references are wired into inputs automatically.
await get_agent_task_status(agent_id, task_id)AgentTaskExecutionStatusPoll a single run (status, pause/tool state, completion).
await get_agent_runs(agent_id)Object with runs listList runs for a workflow, newest first.
await list_workflow_runs_by_parent_task_id(parent_task_id, *, recursive=True)list[dict]Child workflow runs (for example subgraphs); GET .../by-parent/{id}.
await send_tool_responses(agent_id, task_id, tool_responses)PendingToolsResponseSubmit ModelToolCallResponse results (webhook or advanced flows).
await schedule_run_cancel(task_id)NoneRequest cancellation; the runner stops further work on that task.
Shared execute parameters (where applicable): agent_id, string_inputs, image_inputs, environment ("playground", "production", or "test"), metadata, optional tenant_id, optional extra_headers. Interactive methods also accept response_timeout (seconds; default from DEFAULT_INTERACTIVE_RUN_RESPONSE_TIMEOUT; use float("inf") where supported). See Timeouts and errors.

AgentInteractiveRun

Returned by execute_agent. await run and await run.get_response() both resolve to AgentGraphRunStatusResponse when the run finishes.
MemberDescription
task_id, agent_idIdentifiers for this run.
await get_response(timeout=None)Wait for a terminal state; timeout overrides the run’s configured cap.
await runSame as get_response() (configured timeout).

Webhooks and local tool helpers

MethodReturnsPurpose
await process_webhook_tools(request, tools)WebhookToolResultsResponseGiven AgentGraphRunStatusResponse, run tools[name][1](args, agent_id, task_id) for pending calls and build responses.
await process_webhook_agent_completed(request, agent_handlers)AgentTaskCompletedProcessedResultDispatch completion to agent_handlers[graph_id](request).
Module helpers (same package): extract_from_webhook, extract_basic_webhook_data — see Webhooks.

Workflow CRUD

MethodReturnsPurpose
await create_agent(create_request)str (workflow id)Create a graph from CreateAgentGraphObject.
await retrieve_agent(agent_id)AgentGraphDescriptionWithVersionsFetch one workflow including version metadata.
await retrieve_agent_definitions(workflow_ids)dict[str, AgentGraphDescriptionWithVersions]Batch retrieve.
await update_agent(workflow_data)strUpdate from AgentGraphDescription.
await list_workflows(tag=None)list[dict]List tenant workflows; optional tag filter.

Platform utilities

MethodReturnsPurpose
await get_models()listAvailable models from the API.

client.tools — server tool schemas

MethodPurpose
await get_tools(limit=100, offset=0, namespace=None, status=None)List registered tool definitions (paged, optional filters).
await register_tools(definitions)Register or upsert ToolDefinition list on the server.

client.tests — stored test examples

MethodPurpose
await create_tests(agent_id, examples)Create examples from TestExampleItem list.
await get_tests(agent_id)List all tests for an agent.
await get_test(agent_id, test_id)Fetch one test.
await delete_test(agent_id, test_id)Delete one test.
See Evals, tests, and feedback.

client.graders — evaluators and comparisons

MethodPurpose
await execute_grader(agent_id, task_id, node_id=None, tenant_id=None)Run grader workflow for a completed run; returns ExecuteAgentResponse or None if no grader.
await get_agent_primary_grader(agent_id)Primary grader rubric / flags (GetAgentRubricResponse).
await get_agent_rubric(agent_id)Alias of get_agent_primary_grader.
await get_grader_runs_for_agent(agent_id)Grader runs and results for this workflow.
await create_grader(agent_id, rubric, rubric_name, proposed=False)Create grader workflow; proposed=True for non-primary (A/B).
await upsert_grader(agent_id, rubric, rubric_name)Create if missing, else update rubric (set_grader_prompt).
await set_grader_prompt(agent_id, prompt)Upsert grader prompt (new grader version).
await run_comparisons(grader_workflow_id, task_id_from, task_id_to, agent_id=None, limit=None)Schedule proposed grader on a task id range.
await get_eval_comparison(agent_id, task_id_from, task_id_to, proposed_grader_workflow_id)Metrics: primary vs proposed on the same tasks.
await promote(workflow_id, grader_workflow_id)Make proposed grader primary.
await sunset(workflow_id, grader_workflow_id)Remove proposed grader from workflow.

client.execution_feedback and client.agent_feedback

Same namespace exposed twice.
MethodPurpose
await get_feedback(task_id)Execution feedback for one run.
await list_feedback(agent_id, limit=3, before_task_id=None, task_id=None, nearest_runs_after_id_with_feedback=None)Paginated feedback listing (see docstring for cursor semantics).
await save_feedback(task_id, feedback_data, feedback_source="admin")Save structured feedback; feedback_source "admin" or "user".
await save_simple_feedback(task_id, passed, source="user")Binary pass/fail.
await get_simple_feedback(task_id, source=None)Read binary feedback (optional filter by source).

client.agent_execution_records

MethodPurpose
await get_last_run(agent_id)Most recent run or None.
await get_bulk_last_runs(agent_ids)Map **agent_id → last run or None in one round trip.

client.agent_validation — output validators

MethodPurpose
await list_validators(limit=100, offset=0)List JSON Schema validators.
await get_validator(validator_id)Fetch one validator.
await create_validator(name, json_schema)Create validator.
await upsert_validators(validators)Batch upsert by name (dicts and/or Pydantic model classes).
See Output validators.

client.agent_traffic_split

MethodPurpose
await get_current(agent_id)Current traffic split (AgentTrafficSplitResponse).
await get_performance_by_version(agent_id, days=30)Metrics by workflow version.
await get_traffic_with_performance(agent_id, days=30)traffic + performance in one structure.
await update(agent_id, traffic_split)PUT split (AgentTrafficSplitUpdate; distributions sum to 100).

client.task_categorization

MethodPurpose
await get_inputs(task_ids=None, agent_id=None, task_id_start=None)Fetch run inputs for categorization (see client docstring for query shapes).
await publish_categories(agent_id, items)Publish category labels (and optional fields) for tasks.
await poll_categorization_runs(...)Client-side poll helper until runs reach a terminal status (takes a bulk get_run_results_bulk callable).

client.staged_agents

MethodPurpose
await stage_from_payload(original_workflow_id, workflow_payload, staging_reason="agent_diff")Create staged workflow from JSON payload.
await get_staged_workflow_status(workflow_id, reason=None, include_workflow=False)Staging metadata (and optional full workflow).
await get_agent_diff_staged_workflow_id(workflow_id)Staged id for agent_diff, or None.
await get_staged_workflow(workflow_id, reason="agent_diff")Full staged payload for editor-style loads.
await replay_prior_runs(staged_workflow_id, num_runs=10)Replay history on staged graph.
await compare_workflows(original_workflow_id, staged_workflow_id, days=90)Compare performance (after replay).
await rollforward_staged_workflow(staged_workflow_id)Promote staged graph to new live version.

Exceptions and error codes

Import from agentsapi: AgentServiceAPIException, AgentServiceAPITimeoutError, AgentServiceAPIConnectionError, AgentServiceAPIHTTPError, and constants such as ERROR_CODE_TOOL_RESPONSES_ALREADY_SUBMITTED. See Timeouts and errors.

Add a tool

Tools have two parts: a schema on the server (so the model knows the name and arguments) and, for interactive runs, a local async handler in your process. Registering schemas on Sudoiq is not something you repeat on every agent run. Do it once when your application starts (or whenever you deploy a new build that changes tools)—not inside the hot path that calls execute_agent for each task. After a tool is registered, it stays on the platform until you explicitly delete it. If you change the name, description, or parameters later, register again with the revised schema so Sudoiq stays in sync with what the model and your handler implement.

1. Register the schema

from agentsapi.models.tools import ToolDefinition, ParametersSchema, ParameterProperty

await client.tools.register_tools([
    ToolDefinition(
        name="get_current_time",
        namespace="main",
        description="Return the current UTC time as an ISO 8601 string.",
        schema_data=ParametersSchema(
            properties={},
            required=[],
        ),
    ),
])

2. Attach a handler and run

Handlers are usually async def with signature (args: dict, agent_id: str, task_id: int) -> str. The client wraps each tool call so it can record errors, traces, and usage metrics alongside normal execution. When the model invokes a tool, args are exactly what the model supplied; agent_id and task_id identify the run that asked for it. That layout lets you register one handler and reuse it across many agents, while still branching on agent_id (or on values in args) when different workflows need different behavior.
import asyncio
from datetime import datetime, timezone

async def get_current_time(args: dict, agent_id: str, task_id: int) -> str:
    return datetime.now(timezone.utc).isoformat()

client.set_tools({"get_current_time": (None, get_current_time)})

result = await client.run_agent(
    agent_id="YOUR_AGENT_UUID",
    string_inputs={"1": "What time is it?"},
    environment="playground",
)
The tool name must match what the agent graph expects. See Register server tools and Interactive runs and local tools.

Tests and feedback

Store test examples and record run feedback to drive quality and workflow iteration. See Evals, tests, and feedback.

Integration patterns

For long-running or fully asynchronous backends, you can start runs and consume updates without holding a single long-lived connection—see Running agents when you need that split.