Skip to content

Agents API Reference#

High-level agent abstractions for building OpenIntent agents.

Preferred pattern: LLM-Powered Agents

Adding model= to @Agent or @Coordinator is the recommended way to build agents. See the LLM-Powered Agents guide for details.

Agent Decorator#

Agent #

Agent(
    agent_id: str,
    config: Optional[AgentConfig] = None,
    capabilities: Optional[list[str]] = None,
    memory: Optional[str] = None,
    tools: Optional[list] = None,
    auto_heartbeat: bool = True,
    model: Optional[str] = None,
    provider: Optional[str] = None,
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: int = 4096,
    max_tool_rounds: int = 10,
    planning: bool = False,
    stream_by_default: bool = False,
    **kwargs: Any
) -> Callable[[type], type]

Class decorator to create an Agent from a class.

When model is provided, the agent becomes LLM-powered: self.think(prompt) runs an agentic loop that reasons, calls protocol tools (memory, escalation, clarification), and returns a result. self.think_stream(prompt) does the same but yields tokens.

Tools can be plain strings (resolved via RFC-0014 protocol grants) or Tool objects with rich descriptions, parameter schemas, and local callable handlers.

Example — manual agent (no model):

@Agent("research-bot")
class ResearchAgent:
    @on_assignment
    async def work(self, intent):
        return {"result": "done"}

Example — LLM-powered agent with Tool objects:

from openintent import Agent, Tool, tool, on_assignment

@tool(description="Search the web.", parameters={
    "type": "object",
    "properties": {
        "query": {"type": "string", "description": "Search query."},
    },
    "required": ["query"],
})
async def web_search(query: str) -> dict:
    return {"results": [...]}

@Agent("analyst", model="gpt-5.2", tools=[web_search])
class Analyst:
    @on_assignment
    async def work(self, intent):
        return await self.think(intent.description)

LLM Instance Properties (requires model=)#

When model= is set on @Agent, the class gains these methods:

Method Description
self.think(prompt) Agentic tool loop — sends prompt to LLM, executes tool calls, returns final text
self.think_stream(prompt) Same agentic loop but yields tokens as they arrive for real-time streaming
self.reset_conversation() Clear the LLM conversation history to start fresh

Worker#

Worker #

Worker(
    agent_id: str,
    handler: Callable[[Intent], Any],
    base_url: str = "http://localhost:5000",
    api_key: str = "",
)

Ultra-minimal worker for simple, single-purpose agents.

Example
async def process(intent):
    return {"result": do_work(intent.title)}

worker = Worker("processor", process)
worker.run()

run #

run() -> None

Run the worker.

stop #

stop() -> None

Stop the worker.

Coordinator#

Coordinator #

Coordinator(
    coordinator_id: str,
    agents: Optional[list[str]] = None,
    strategy: str = "sequential",
    guardrails: Optional[list[str]] = None,
    config: Optional[AgentConfig] = None,
    capabilities: Optional[list[str]] = None,
    memory: Optional[str] = None,
    tools: Optional[list] = None,
    auto_heartbeat: bool = True,
    model: Optional[str] = None,
    provider: Optional[str] = None,
    system_prompt: Optional[str] = None,
    temperature: float = 0.7,
    max_tokens: int = 4096,
    max_tool_rounds: int = 10,
    planning: bool = True,
    stream_by_default: bool = False,
    **kwargs: Any
) -> Callable[[type], type]

Class decorator to create a Coordinator from a class.

A Coordinator manages portfolios of intents, handles dependency tracking, multi-intent orchestration, and governance.

When model is provided, the coordinator becomes LLM-powered: self.think(prompt) reasons about delegation, planning, and governance decisions. The LLM can call coordinator-specific tools like delegate, create_plan, and record_decision.

Example — manual coordinator:

@Coordinator("orchestrator", agents=["researcher", "writer"])
class MyCoordinator:
    @on_assignment
    async def plan(self, intent):
        spec = PortfolioSpec(
            name=intent.title,
            intents=[
                IntentSpec("Research", assign="researcher"),
                IntentSpec("Write", assign="writer", depends_on=["Research"]),
            ]
        )
        return await self.execute(spec)

Example — LLM-powered coordinator:

@Coordinator(
    "lead",
    model="claude-sonnet-4-20250514",
    agents=["researcher", "writer", "reviewer"],
    memory="episodic",
)
class ProjectLead:
    @on_assignment
    async def plan(self, intent):
        return await self.think(
            f"Break down this project and delegate to your team: {intent.description}"
        )

Built-in Coordinator Guardrails#

The guardrails= parameter on @Coordinator accepts these built-in policies:

Policy Description
"require_approval" Logs decision records before assignment
"budget_limit" Rejects intents exceeding cost constraints
"agent_allowlist" Rejects delegation to agents outside the managed list

Protocol Decorators#

Plan (RFC-0012)#

Plan #

Plan(
    name: str,
    strategy: str = "sequential",
    max_concurrent: int = 5,
    failure_policy: str = "fail_fast",
) -> Callable[[type], type]

Declarative plan definition (RFC-0012). Defines task decomposition strategy.

Vault (RFC-0014)#

Vault #

Vault(
    name: str, rotate_keys: bool = False
) -> Callable[[type], type]

Declarative credential vault (RFC-0014). Defines tool access and credential policies.

Memory (RFC-0015)#

Memory #

Memory(
    namespace: str,
    tier: str = "episodic",
    ttl: Optional[int] = None,
    max_entries: int = 1000,
) -> Callable[[type], type]

Declarative memory configuration (RFC-0015). Defines memory tier and policies.

Trigger (RFC-0017)#

Trigger #

Trigger(
    name: str,
    type: str = "schedule",
    condition: Optional[str] = None,
    cron: Optional[str] = None,
    dedup: str = "skip",
) -> Callable[[type], type]

Declarative trigger definition (RFC-0017). Creates intents when conditions are met.

Lifecycle Decorators#

on_assignment#

on_assignment #

on_assignment(func: Callable) -> Callable

Decorator: Called when the agent is assigned to an intent.

The handler receives the intent and should return state updates. Return values are automatically patched to the intent's state.

Example
@on_assignment
async def handle(self, intent):
    result = await do_work(intent.title)
    return {"result": result}  # Auto-patched to state

on_complete#

on_complete #

on_complete(func: Callable) -> Callable

Decorator: Called when an intent completes.

Useful for cleanup, notification, or triggering follow-up work.

Example
@on_complete
async def cleanup(self, intent):
    await notify_completion(intent.id)

on_state_change#

on_state_change #

on_state_change(
    keys: Optional[list[str]] = None,
) -> Callable

Decorator: Called when intent state changes.

Parameters:

Name Type Description Default
keys Optional[list[str]]

Optional list of state keys to watch. If None, triggers on any change.

None
Example
@on_state_change(["progress"])
async def track_progress(self, intent, old_state, new_state):
    print(f"Progress: {new_state.get('progress')}")

on_event#

on_event #

on_event(event_type: Union[str, EventType]) -> Callable

Decorator: Called when a specific event type occurs.

Parameters:

Name Type Description Default
event_type Union[str, EventType]

The event type to handle.

required
Example
@on_event(EventType.ARBITRATION_REQUESTED)
async def handle_arbitration(self, intent, event):
    await escalate(intent.id)

on_lease_available#

on_lease_available #

on_lease_available(scope: str) -> Callable

Decorator: Called when a lease becomes available for a specific scope.

Parameters:

Name Type Description Default
scope str

The scope to watch for lease availability.

required
Example
@on_lease_available("research")
async def claim(self, intent, scope):
    async with self.lease(intent.id, scope):
        await do_exclusive_work()

on_all_complete#

on_all_complete #

on_all_complete(func: Callable) -> Callable

Decorator: Called when all intents in a portfolio complete.

Only applicable for Coordinator agents managing portfolios.

Example
@on_all_complete
async def finalize(self, portfolio):
    return merge_results(portfolio.intents)

on_access_requested (RFC-0011)#

on_access_requested #

on_access_requested(func: Callable) -> Callable

Decorator: Called when another principal requests access to an intent this agent administers. Return "approve", "deny", or "defer".

Enables policy-as-code for automated access decisions.

Example
@on_access_requested
async def policy(self, intent, request):
    if "ocr" in request.capabilities:
        return "approve"
    return "defer"

on_task (RFC-0012)#

on_task #

on_task(status: Optional[str] = None) -> Callable

Decorator: Called when a task lifecycle event occurs.

Parameters:

Name Type Description Default
status Optional[str]

Optional task status filter (e.g., "completed", "failed"). If None, triggers on any task event.

None

on_trigger (RFC-0017)#

on_trigger #

on_trigger(name: Optional[str] = None) -> Callable

Decorator: Called when a trigger fires and creates an intent for this agent.

Parameters:

Name Type Description Default
name Optional[str]

Optional trigger name filter. If None, handles any trigger.

None

on_drain (RFC-0016)#

on_drain #

on_drain(func: Callable) -> Callable

Decorator: Called when the agent receives a drain signal.

The handler should finish in-progress work and prepare for shutdown. The agent will stop accepting new assignments after this is called.

on_handoff (RFC-0013)#

Fires when an agent receives work delegated from another agent. The handler receives the intent and the delegating agent's ID.

on_handoff #

on_handoff(func: Callable) -> Callable

Decorator: Called when this agent receives work delegated from another agent.

Unlike @on_assignment which fires for all assignments, @on_handoff fires only when the assignment includes delegation context (delegated_by is set). The handler receives the intent and the delegating agent's ID.

Example
@on_handoff
async def received(self, intent, from_agent):
    previous = await self.memory.recall(key=f"handoff:{intent.id}")
    return {"status": "continuing", "from": from_agent}

on_retry (RFC-0010)#

Fires when an intent is reassigned after a previous failure. The handler receives the intent, attempt number, and last error.

on_retry #

on_retry(func: Callable) -> Callable

Decorator: Called when an intent is reassigned after a previous failure.

The handler receives the intent and retry metadata (attempt number, previous failure reason). Allows agents to adapt behaviour on retries (e.g. use a different strategy, reduce scope, or escalate).

Example
@on_retry
async def handle_retry(self, intent, attempt, last_error):
    if attempt >= 3:
        await self.escalate(intent.id, "Too many retries")
        return
    return await self.think(f"Retry attempt {attempt}: {intent.title}")

Guardrail Decorators#

input_guardrail#

input_guardrail #

input_guardrail(func: Callable) -> Callable

Decorator: Validates or transforms intent data before assignment handlers run.

Input guardrails execute in registration order before any @on_assignment handler. If a guardrail raises GuardrailError (or returns False), the assignment is rejected and the intent can be escalated.

Example
@input_guardrail
async def check_scope(self, intent):
    if len(intent.description) > 10_000:
        raise GuardrailError("Input too long")

output_guardrail#

output_guardrail #

output_guardrail(func: Callable) -> Callable

Decorator: Validates or transforms handler results before they are committed.

Output guardrails execute in registration order after @on_assignment handlers return. The guardrail receives the intent and the result dict. If it raises GuardrailError (or returns False), the result is discarded and the intent can be escalated.

Example
@output_guardrail
async def check_pii(self, intent, result):
    for key, val in result.items():
        if contains_pii(str(val)):
            raise GuardrailError(f"PII detected in '{key}'")

GuardrailError#

GuardrailError #

Bases: Exception

Raised by input/output guardrails to reject processing.

Coordinator Lifecycle Decorators#

on_conflict#

on_conflict #

on_conflict(func: Callable) -> Callable

Decorator: Called when version conflicts occur between agents (RFC-0002). Handler receives (self, intent, conflict) with conflict details.

on_escalation#

on_escalation #

on_escalation(func: Callable) -> Callable

Decorator: Called when an agent requests coordinator intervention. Handler receives (self, intent, agent_id, reason).

on_quorum#

on_quorum #

on_quorum(threshold: float = 0.5) -> Callable

Decorator: Called when multi-agent voting reaches a threshold. Args: threshold - fraction of agents needed (0.0 to 1.0). Handler receives (self, intent, votes).

Tool Definitions#

ToolDef#

ToolDef(name, description, parameters, handler) — rich tool definition for LLM function calling with local execution. Pass ToolDef objects in the tools= parameter on @Agent or @Coordinator.

Field Type Description
name str Tool name (used in function calling)
description str What the tool does (shown to the LLM)
parameters dict JSON Schema for tool arguments
handler callable Local function called when the LLM invokes the tool

@define_tool#

@define_tool(description=, parameters=) — decorator that turns a function into a ToolDef object.

from openintent import define_tool

@define_tool(description="Search the web.", parameters={
    "type": "object",
    "properties": {"query": {"type": "string"}},
    "required": ["query"],
})
async def web_search(query: str) -> dict:
    return {"results": await fetch_results(query)}

Backwards compatibility

Tool = ToolDef, @tool = @define_tool. The old names are kept as aliases.

Tool Execution Priority#

  1. Protocol tools (remember, recall, clarify, escalate, update_status) — always first
  2. Local handlers (ToolDef objects) — executed in-process
  3. Remote protocol grants (string names via RFC-0014) — resolved via server proxy

Proxy Classes#

_ToolsProxy#

The _ToolsProxy class provides self.tools on agents. For string tool names (RFC-0014 grants), it delegates to client.invoke_tool() for server-side invocation. For ToolDef objects, it executes the local handler directly.

# Server-side invocation (string tool name → server proxy)
result = await self.tools.invoke("web_search", {"query": "..."})

# Local invocation (ToolDef handler)
result = await self.tools.invoke(my_tooldef, {"param": "value"})

_MemoryProxy#

self.memory proxy for RFC-0015 agent memory operations.

_TasksProxy#

self.tasks proxy for RFC-0012 task creation and management.

Internal Classes#

BaseAgent#

BaseAgent #

BaseAgent(
    base_url: Optional[str] = None,
    api_key: Optional[str] = None,
    config: Optional[AgentConfig] = None,
)

Bases: ABC

Base class for OpenIntent agents.

Provides automatic subscription management, event routing, and lifecycle handling.

agent_id property #

agent_id: str

The agent's unique identifier.

async_client property #

async_client: AsyncOpenIntentClient

Get or create the asynchronous client.

client property #

client: OpenIntentClient

Get or create the synchronous client.

identity_config property #

identity_config: dict

Get identity configuration (RFC-0018).

memory property #

memory: _MemoryProxy

Access agent memory (RFC-0015). Configure via @Agent(memory="episodic").

tasks property #

tasks: _TasksProxy

Access task operations (RFC-0012).

tools property #

tools: _ToolsProxy

Access tool invocation (RFC-0014). Configure via @Agent(tools=["web_search"]).

complete_intent async #

complete_intent(
    intent_id: str,
    final_state: Optional[dict[str, Any]] = None,
) -> None

Mark an intent as completed with optional final state.

delegate async #

delegate(
    intent_id: str,
    target_agent_id: str,
    payload: Optional[dict[str, Any]] = None,
) -> None

Delegate work on an intent to another agent.

The target agent receives the assignment with intent.ctx.delegated_by set.

escalate async #

escalate(
    intent_id: str,
    reason: str,
    data: Optional[dict[str, Any]] = None,
) -> None

Escalate an intent to administrators for review.

Creates an arbitration request through the governance pipeline.

grant_access async #

grant_access(
    intent_id: str,
    principal_id: str,
    permission: str = "write",
    reason: Optional[str] = None,
) -> ACLEntry

Grant access to another principal on an intent.

lease #

lease(
    intent_id: str, scope: str, duration_seconds: int = 300
)

Acquire a lease as a context manager.

Example
async with self.lease(intent.id, "research"):
    await do_exclusive_work()

log async #

log(intent_id: str, message: str, **data: Any) -> None

Log a comment event to an intent.

patch_state async #

patch_state(
    intent_id: str, updates: dict[str, Any]
) -> None

Patch intent state with updates.

Automatically handles version tracking.

revoke_access async #

revoke_access(intent_id: str, entry_id: str) -> None

Revoke access from a principal.

run #

run() -> None

Start the agent and begin processing events.

This method blocks until stop() is called.

stop #

stop() -> None

Stop the agent.

temp_access #

temp_access(
    intent_id: str,
    principal_id: str,
    permission: str = "write",
    reason: Optional[str] = None,
) -> _TempAccessContext

Context manager for temporary access grants with automatic revocation.

Example
async with self.temp_access(intent.id, "helper-agent", "write"):
    await self.client.assign_agent(intent.id, "helper-agent")
# Access automatically revoked

AgentConfig#

AgentConfig dataclass #

AgentConfig(
    base_url: str = "http://localhost:5000",
    api_key: str = "",
    auto_subscribe: bool = True,
    auto_complete: bool = True,
    reconnect_delay: float = 5.0,
    max_reconnects: int = 10,
    log_level: int = logging.INFO,
    capabilities: list[str] = list(),
    auto_request_access: bool = False,
    auto_heartbeat: bool = True,
    heartbeat_interval: float = 30.0,
    drain_timeout: float = 60.0,
    memory: Optional[str] = None,
    memory_namespace: Optional[str] = None,
    tools: list = list(),
    identity_key_path: Optional[str] = None,
    auto_sign: bool = False,
    auto_register_identity: bool = False,
    verify_incoming: bool = False,
)

Configuration for an Agent.