Skip to content

Agents API Reference#

High-level agent abstractions for building OpenIntent agents.

Preferred pattern: LLM-Powered Agents

Adding model= to @Agent or @Coordinator is the recommended way to build agents. See the LLM-Powered Agents guide for details.

Agent Decorator#

Agent #

Agent(agent_id: str, config: Optional[AgentConfig] = None, capabilities: Optional[list[str]] = None, memory: Optional[str] = None, tools: Optional[list] = None, auto_heartbeat: bool = True, governance_policy: Optional[dict[str, Any]] = None, model: Optional[str] = None, provider: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096, max_tool_rounds: int = 10, planning: bool = False, stream_by_default: bool = False, federation_visibility: Optional[str] = None, **kwargs: Any) -> Callable[[type], type]

Class decorator to create an Agent from a class.

When model is provided, the agent becomes LLM-powered: self.think(prompt) runs an agentic loop that reasons, calls protocol tools (memory, escalation, clarification), and returns a result. self.think_stream(prompt) does the same but yields tokens.

Tools can be plain strings (resolved via RFC-0014 protocol grants) or Tool objects with rich descriptions, parameter schemas, and local callable handlers.

Example — manual agent (no model):

@Agent("research-bot")
class ResearchAgent:
    @on_assignment
    async def work(self, intent):
        return {"result": "done"}

Example — LLM-powered agent with Tool objects:

from openintent import Agent, Tool, tool, on_assignment

@tool(description="Search the web.", parameters={
    "type": "object",
    "properties": {
        "query": {"type": "string", "description": "Search query."},
    },
    "required": ["query"],
})
async def web_search(query: str) -> dict:
    return {"results": [...]}

@Agent("analyst", model="gpt-5.2", tools=[web_search])
class Analyst:
    @on_assignment
    async def work(self, intent):
        return await self.think(intent.description)

LLM Instance Properties (requires model=)#

When model= is set on @Agent, the class gains these methods:

Method Description
self.think(prompt) Agentic tool loop — sends prompt to LLM, executes tool calls, returns final text
self.think_stream(prompt) Same agentic loop but yields tokens as they arrive for real-time streaming
self.reset_conversation() Clear the LLM conversation history to start fresh

Worker#

Worker #

Worker(agent_id: str, handler: Callable[[Intent], Any], base_url: str = 'http://localhost:5000', api_key: str = '')

Ultra-minimal worker for simple, single-purpose agents.

Example
async def process(intent):
    return {"result": do_work(intent.title)}

worker = Worker("processor", process)
worker.run()

run #

run() -> None

Run the worker.

stop #

stop() -> None

Stop the worker.

Coordinator#

Coordinator #

Coordinator(coordinator_id: str, agents: Optional[list[str]] = None, strategy: str = 'sequential', guardrails: Optional[list[str]] = None, config: Optional[AgentConfig] = None, capabilities: Optional[list[str]] = None, memory: Optional[str] = None, tools: Optional[list] = None, auto_heartbeat: bool = True, governance_policy: Optional[dict[str, Any]] = None, model: Optional[str] = None, provider: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096, max_tool_rounds: int = 10, planning: bool = True, stream_by_default: bool = False, federation_visibility: Optional[str] = None, federation_policy: Optional[dict[str, Any]] = None, **kwargs: Any) -> Callable[[type], type]

Class decorator to create a Coordinator from a class.

A Coordinator manages portfolios of intents, handles dependency tracking, multi-intent orchestration, and governance.

When model is provided, the coordinator becomes LLM-powered: self.think(prompt) reasons about delegation, planning, and governance decisions. The LLM can call coordinator-specific tools like delegate, create_plan, and record_decision.

Example — manual coordinator:

@Coordinator("orchestrator", agents=["researcher", "writer"])
class MyCoordinator:
    @on_assignment
    async def plan(self, intent):
        spec = PortfolioSpec(
            name=intent.title,
            intents=[
                IntentSpec("Research", assign="researcher"),
                IntentSpec("Write", assign="writer", depends_on=["Research"]),
            ]
        )
        return await self.execute(spec)

Example — LLM-powered coordinator:

@Coordinator(
    "lead",
    model="claude-sonnet-4-20250514",
    agents=["researcher", "writer", "reviewer"],
    memory="episodic",
)
class ProjectLead:
    @on_assignment
    async def plan(self, intent):
        return await self.think(
            f"Break down this project and delegate to your team: {intent.description}"
        )

Built-in Coordinator Guardrails#

The guardrails= parameter on @Coordinator accepts these built-in policies:

Policy Description
"require_approval" Logs decision records before assignment
"budget_limit" Rejects intents exceeding cost constraints
"agent_allowlist" Rejects delegation to agents outside the managed list

Protocol Decorators#

Plan (RFC-0012)#

Plan #

Plan(name: str, strategy: str = 'sequential', max_concurrent: int = 5, failure_policy: str = 'fail_fast') -> Callable[[type], type]

Declarative plan definition (RFC-0012). Defines task decomposition strategy.

Vault (RFC-0014)#

Vault #

Vault(name: str, rotate_keys: bool = False) -> Callable[[type], type]

Declarative credential vault (RFC-0014). Defines tool access and credential policies.

Memory (RFC-0015)#

Memory #

Memory(namespace: str, tier: str = 'episodic', ttl: Optional[int] = None, max_entries: int = 1000) -> Callable[[type], type]

Declarative memory configuration (RFC-0015). Defines memory tier and policies.

Trigger (RFC-0017)#

Trigger #

Trigger(name: str, type: str = 'schedule', condition: Optional[str] = None, cron: Optional[str] = None, dedup: str = 'skip') -> Callable[[type], type]

Declarative trigger definition (RFC-0017). Creates intents when conditions are met.

Lifecycle Decorators#

on_assignment#

on_assignment #

on_assignment(func: Callable) -> Callable

Decorator: Called when the agent is assigned to an intent.

The handler receives the intent and should return state updates. Return values are automatically patched to the intent's state.

Example
@on_assignment
async def handle(self, intent):
    result = await do_work(intent.title)
    return {"result": result}  # Auto-patched to state

on_complete#

on_complete #

on_complete(func: Callable) -> Callable

Decorator: Called when an intent completes.

Useful for cleanup, notification, or triggering follow-up work.

Example
@on_complete
async def cleanup(self, intent):
    await notify_completion(intent.id)

on_state_change#

on_state_change #

on_state_change(keys: Optional[list[str]] = None) -> Callable

Decorator: Called when intent state changes.

Parameters:

Name Type Description Default
keys Optional[list[str]]

Optional list of state keys to watch. If None, triggers on any change.

None
Example
@on_state_change(["progress"])
async def track_progress(self, intent, old_state, new_state):
    print(f"Progress: {new_state.get('progress')}")

on_event#

on_event #

on_event(event_type: Union[str, EventType]) -> Callable

Decorator: Called when a specific event type occurs.

Parameters:

Name Type Description Default
event_type Union[str, EventType]

The event type to handle.

required
Example
@on_event(EventType.ARBITRATION_REQUESTED)
async def handle_arbitration(self, intent, event):
    await escalate(intent.id)

on_lease_available#

on_lease_available #

on_lease_available(scope: str) -> Callable

Decorator: Called when a lease becomes available for a specific scope.

Parameters:

Name Type Description Default
scope str

The scope to watch for lease availability.

required
Example
@on_lease_available("research")
async def claim(self, intent, scope):
    async with self.lease(intent.id, scope):
        await do_exclusive_work()

on_all_complete#

on_all_complete #

on_all_complete(func: Callable) -> Callable

Decorator: Called when all intents in a portfolio complete.

Only applicable for Coordinator agents managing portfolios.

Example
@on_all_complete
async def finalize(self, portfolio):
    return merge_results(portfolio.intents)

on_access_requested (RFC-0011)#

on_access_requested #

on_access_requested(func: Callable) -> Callable

Decorator: Called when another principal requests access to an intent this agent administers. Return "approve", "deny", or "defer".

Enables policy-as-code for automated access decisions.

Example
@on_access_requested
async def policy(self, intent, request):
    if "ocr" in request.capabilities:
        return "approve"
    return "defer"

on_task (RFC-0012)#

on_task #

on_task(status: Optional[str] = None) -> Callable

Decorator: Called when a task lifecycle event occurs.

Parameters:

Name Type Description Default
status Optional[str]

Optional task status filter (e.g., "completed", "failed"). If None, triggers on any task event.

None

on_trigger (RFC-0017)#

on_trigger #

on_trigger(name: Optional[str] = None) -> Callable

Decorator: Called when a trigger fires and creates an intent for this agent.

Parameters:

Name Type Description Default
name Optional[str]

Optional trigger name filter. If None, handles any trigger.

None

on_drain (RFC-0016)#

on_drain #

on_drain(func: Callable) -> Callable

Decorator: Called when the agent receives a drain signal.

The handler should finish in-progress work and prepare for shutdown. The agent will stop accepting new assignments after this is called.

on_handoff (RFC-0013)#

Fires when an agent receives work delegated from another agent. The handler receives the intent and the delegating agent's ID.

on_handoff #

on_handoff(func: Callable) -> Callable

Decorator: Called when this agent receives work delegated from another agent.

Unlike @on_assignment which fires for all assignments, @on_handoff fires only when the assignment includes delegation context (delegated_by is set). The handler receives the intent and the delegating agent's ID.

Example
@on_handoff
async def received(self, intent, from_agent):
    previous = await self.memory.recall(key=f"handoff:{intent.id}")
    return {"status": "continuing", "from": from_agent}

on_retry (RFC-0010)#

Fires when an intent is reassigned after a previous failure. The handler receives the intent, attempt number, and last error.

on_retry #

on_retry(func: Callable) -> Callable

Decorator: Called when an intent is reassigned after a previous failure.

The handler receives the intent and retry metadata (attempt number, previous failure reason). Allows agents to adapt behaviour on retries (e.g. use a different strategy, reduce scope, or escalate).

Example
@on_retry
async def handle_retry(self, intent, attempt, last_error):
    if attempt >= 3:
        await self.escalate(intent.id, "Too many retries")
        return
    return await self.think(f"Retry attempt {attempt}: {intent.title}")

Guardrail Decorators#

input_guardrail#

input_guardrail #

input_guardrail(func: Callable) -> Callable

Decorator: Validates or transforms intent data before assignment handlers run.

Input guardrails execute in registration order before any @on_assignment handler. If a guardrail raises GuardrailError (or returns False), the assignment is rejected and the intent can be escalated.

Example
@input_guardrail
async def check_scope(self, intent):
    if len(intent.description) > 10_000:
        raise GuardrailError("Input too long")

output_guardrail#

output_guardrail #

output_guardrail(func: Callable) -> Callable

Decorator: Validates or transforms handler results before they are committed.

Output guardrails execute in registration order after @on_assignment handlers return. The guardrail receives the intent and the result dict. If it raises GuardrailError (or returns False), the result is discarded and the intent can be escalated.

Example
@output_guardrail
async def check_pii(self, intent, result):
    for key, val in result.items():
        if contains_pii(str(val)):
            raise GuardrailError(f"PII detected in '{key}'")

GuardrailError#

GuardrailError #

Bases: Exception

Raised by input/output guardrails to reject processing.

Coordinator Lifecycle Decorators#

on_conflict#

on_conflict #

on_conflict(func: Callable) -> Callable

Decorator: Called when version conflicts occur between agents (RFC-0002). Handler receives (self, intent, conflict) with conflict details.

on_escalation#

on_escalation #

on_escalation(func: Callable) -> Callable

Decorator: Called when an agent requests coordinator intervention. Handler receives (self, intent, agent_id, reason).

on_quorum#

on_quorum #

on_quorum(threshold: float = 0.5) -> Callable

Decorator: Called when multi-agent voting reaches a threshold. Args: threshold - fraction of agents needed (0.0 to 1.0). Handler receives (self, intent, votes).

Tool Definitions#

ToolDef#

ToolDef(name, description, parameters, handler) — rich tool definition for LLM function calling with local execution. Pass ToolDef objects in the tools= parameter on @Agent or @Coordinator.

Field Type Description
name str Tool name (used in function calling)
description str What the tool does (shown to the LLM)
parameters dict JSON Schema for tool arguments
handler callable Local function called when the LLM invokes the tool

@define_tool#

@define_tool(description=, parameters=) — decorator that turns a function into a ToolDef object.

from openintent import define_tool

@define_tool(description="Search the web.", parameters={
    "type": "object",
    "properties": {"query": {"type": "string"}},
    "required": ["query"],
})
async def web_search(query: str) -> dict:
    return {"results": await fetch_results(query)}

Backwards compatibility

Tool = ToolDef, @tool = @define_tool. The old names are kept as aliases.

Tool Execution Priority#

  1. Protocol tools (remember, recall, clarify, escalate, update_status) — always first
  2. Local handlers (ToolDef objects) — executed in-process
  3. Remote protocol grants (string names via RFC-0014) — resolved via server proxy

Proxy Classes#

_ToolsProxy#

The _ToolsProxy class provides self.tools on agents. For string tool names (RFC-0014 grants), it delegates to client.invoke_tool() for server-side invocation. For ToolDef objects, it executes the local handler directly.

# Server-side invocation (string tool name → server proxy)
result = await self.tools.invoke("web_search", {"query": "..."})

# Local invocation (ToolDef handler)
result = await self.tools.invoke(my_tooldef, {"param": "value"})

_MemoryProxy#

self.memory proxy for RFC-0015 agent memory operations.

_TasksProxy#

self.tasks proxy for RFC-0012 task creation and management.

Internal Classes#

BaseAgent#

BaseAgent #

BaseAgent(base_url: Optional[str] = None, api_key: Optional[str] = None, config: Optional[AgentConfig] = None)

Bases: ABC

Base class for OpenIntent agents.

Provides automatic subscription management, event routing, and lifecycle handling.

agent_id property #

agent_id: str

The agent's unique identifier.

async_client property #

async_client: AsyncOpenIntentClient

Get or create the asynchronous client.

channels property #

channels: _ChannelsProxy

Access channel messaging operations (RFC-0021).

client property #

client: OpenIntentClient

Get or create the synchronous client.

default_human_retry_policy class-attribute instance-attribute #

default_human_retry_policy: Optional[HumanRetryPolicy] = None

Class-level or instance-level default re-notification policy (RFC-0026).

When set, this policy is applied to all request_input() calls that do not supply an explicit retry_policy argument. Subclasses may override at class level:

.. code-block:: python

class MyAgent(BaseAgent):
    default_human_retry_policy = HumanRetryPolicy(
        max_attempts=3,
        interval_seconds=900,
        strategy="linear",
        final_fallback_policy="complete_with_fallback",
    )

governance property #

governance: _GovernanceProxy

Access governance policy operations (RFC-0013 Enforcement).

Provides methods to set/get/remove governance policies, request approval, and approve/deny approval requests.

identity_config property #

identity_config: dict

Get identity configuration (RFC-0018).

memory property #

memory: _MemoryProxy

Access agent memory (RFC-0015). Configure via @Agent(memory="episodic").

tasks property #

tasks: _TasksProxy

Access task operations (RFC-0012).

tools property #

tools: _ToolsProxy

Access tool invocation (RFC-0014). Configure via @Agent(tools=["web_search"]).

complete_intent async #

complete_intent(intent_id: str, final_state: Optional[dict[str, Any]] = None) -> None

Mark an intent as completed with optional final state.

delegate async #

delegate(intent_id: str, target_agent_id: str, payload: Optional[dict[str, Any]] = None) -> None

Delegate work on an intent to another agent.

The target agent receives the assignment with intent.ctx.delegated_by set.

escalate async #

escalate(intent_id: str, reason: str, data: Optional[dict[str, Any]] = None, priority: str = 'medium', urgency: str = 'medium') -> Escalation

Escalate an intent to a human for review.

Creates a human escalation request through the governance pipeline (RFC-0013).

grant_access async #

grant_access(intent_id: str, principal_id: str, permission: str = 'write', reason: Optional[str] = None) -> ACLEntry

Grant access to another principal on an intent.

lease #

lease(intent_id: str, scope: str, duration_seconds: int = 300)

Acquire a lease as a context manager.

Example
async with self.lease(intent.id, "research"):
    await do_exclusive_work()

log async #

log(intent_id: str, message: str, **data: Any) -> None

Log a comment event to an intent.

patch_state async #

patch_state(intent_id: str, updates: dict[str, Any]) -> None

Patch intent state with updates.

Automatically handles version tracking.

request_input async #

request_input(intent_id: str, question: str, response_type: str = 'choice', choices: Optional[list[Union[dict[str, Any], SuspensionChoice]]] = None, context: Optional[dict[str, Any]] = None, channel_hint: Optional[str] = None, timeout_seconds: Optional[int] = None, fallback_policy: str = 'fail', fallback_value: Optional[Any] = None, confidence: Optional[float] = None, retry_policy: Optional[HumanRetryPolicy] = None) -> Any

Suspend the intent and request operator input (RFC-0025 / RFC-0026).

Transitions the intent to suspended_awaiting_input, fires @on_input_requested hooks, and polls for a response. When the operator responds (via POST /intents/{id}/suspend/respond), the suspension is resolved and the operator's response value is returned.

When retry_policy is supplied (RFC-0026), the agent re-notifies the operator up to retry_policy.max_attempts times, waiting retry_policy.interval_seconds between each attempt, firing @on_input_requested hooks and emitting intent.suspension_renotified events on each re-attempt. Escalation steps in retry_policy.escalation_ladder cause an intent.suspension_escalated event to be emitted. After all attempts are exhausted, retry_policy.final_fallback_policy is applied.

If retry_policy is omitted, single-attempt behaviour is preserved (original RFC-0025 semantics).

The class attribute default_human_retry_policy can be set on a BaseAgent subclass or instance to apply a retry policy to all request_input() calls that do not supply an explicit one.

Parameters:

Name Type Description Default
intent_id str

The intent to suspend.

required
question str

The question or prompt for the operator.

required
response_type str

Expected response type — "choice" (default), "confirm", "text", or "form".

'choice'
choices Optional[list[Union[dict[str, Any], SuspensionChoice]]]

List of SuspensionChoice objects or plain dicts with value and label keys. For response_type="confirm" and no explicit choices, defaults to yes/no. For "choice" the operator must select one of these values.

None
context Optional[dict[str, Any]]

Structured context to help the operator decide.

None
channel_hint Optional[str]

Preferred delivery channel (e.g. "slack").

None
timeout_seconds Optional[int]

Seconds before the suspension expires.

None
fallback_policy str

What to do on timeout: "fail", "complete_with_fallback", or "use_default_and_continue".

'fail'
fallback_value Optional[Any]

Value to use for "complete_with_fallback" policy.

None
confidence Optional[float]

Agent confidence score at suspension time (0.0–1.0).

None
retry_policy Optional[HumanRetryPolicy]

Optional re-notification / escalation policy (RFC-0026). When omitted, falls back to self.default_human_retry_policy (if set), then single-attempt.

None

Returns:

Type Description
Any

The operator's response value.

Raises:

Type Description
InputTimeoutError

If the suspension expires without a response and fallback_policy is "fail".

InputCancelledError

If the suspension is cancelled.

revoke_access async #

revoke_access(intent_id: str, entry_id: str) -> None

Revoke access from a principal.

run #

run() -> None

Start the agent and begin processing events.

This method blocks until stop() is called.

should_request_input async #

should_request_input(intent_id: str, signals: Optional[EngagementSignals] = None, confidence: Optional[float] = None, risk: Optional[float] = None, reversibility: Optional[float] = None) -> EngagementDecision

Decide whether to ask for human input or act autonomously (RFC-0025).

Implements the default rule-based engagement logic: - autonomous: high confidence, low risk, reversible — act now. - request_input: moderate uncertainty — ask but don't block. - require_input: high risk or low confidence — must ask. - defer: risk too high to act, escalate to coordinator.

The decision is emitted as an engagement.decision event and @on_engagement_decision handlers are fired before returning.

Parameters:

Name Type Description Default
intent_id str

The intent context.

required
signals Optional[EngagementSignals]

Pre-built EngagementSignals object (takes priority).

None
confidence Optional[float]

Agent confidence in autonomous action (0.0–1.0).

None
risk Optional[float]

Estimated risk of acting autonomously (0.0–1.0).

None
reversibility Optional[float]

How reversible the action is (0.0–1.0).

None

Returns:

Type Description
EngagementDecision

An EngagementDecision indicating the recommended mode.

stop #

stop() -> None

Stop the agent and disconnect MCP servers.

temp_access #

temp_access(intent_id: str, principal_id: str, permission: str = 'write', reason: Optional[str] = None) -> _TempAccessContext

Context manager for temporary access grants with automatic revocation.

Example
async with self.temp_access(intent.id, "helper-agent", "write"):
    await self.client.assign_agent(intent.id, "helper-agent")
# Access automatically revoked

AgentConfig#

AgentConfig dataclass #

AgentConfig(base_url: str = 'http://localhost:5000', api_key: str = '', auto_subscribe: bool = True, auto_complete: bool = True, reconnect_delay: float = 5.0, max_reconnects: int = 10, log_level: int = logging.INFO, capabilities: list[str] = list(), auto_request_access: bool = False, auto_heartbeat: bool = True, heartbeat_interval: float = 30.0, drain_timeout: float = 60.0, memory: Optional[str] = None, memory_namespace: Optional[str] = None, tools: list = list(), identity_key_path: Optional[str] = None, auto_sign: bool = False, auto_register_identity: bool = False, verify_incoming: bool = False)

Configuration for an Agent.