Agent Abstractions#
The openintent.agents module provides high-level, decorator-first abstractions for building agents with minimal boilerplate. Decorators and class parameters express protocol semantics so the framework handles the heavy lifting.
Three Levels of Abstraction#
| Level | Class | Best for |
|---|---|---|
| Simple | Worker |
Single-purpose agents with one handler |
| Standard | @Agent |
Full-featured agents with event routing, memory, tools |
| Complex | @Coordinator |
Multi-agent orchestration with governance |
graph TD
W[Worker] -->|"Single handler"| R[Run]
A["@Agent"] -->|"Lifecycle hooks<br>Memory, Tasks, Tools"| R
C["@Coordinator"] -->|"Delegation<br>Governance, Decisions"| R
style W fill:#f6f8fa,stroke:#e3e8ee,color:#0a2540
style A fill:#635bff,stroke:#4b44d1,color:#fff
style C fill:#00d4aa,stroke:#00b894,color:#0a2540
style R fill:#f6f8fa,stroke:#e3e8ee,color:#0a2540
Worker (Simplest)#
For single-purpose agents that do one thing well:
from openintent import Worker
async def process(intent):
return {"result": do_work(intent.title)}
worker = Worker("processor", process)
worker.run()
When to use Worker
Use Worker when your agent has a single responsibility and doesn't need lifecycle hooks, memory, or tools. It's the fastest path from zero to running agent.
@Agent Decorator (Recommended)#
Zero-boilerplate agent classes with auto-subscription, state auto-patching, and protocol-managed lifecycle:
from openintent.agents import Agent, on_assignment, on_complete, on_state_change
@Agent(
"research-agent",
model: str = None, # LLM model — enables self.think() (see LLM Agents guide)
memory: str = "episodic", # Memory tier: "working", "episodic", or "semantic"
tools: list = None, # ToolDef objects or RFC-0014 grant names
capabilities: list = None, # Declared capabilities for discovery & routing
auto_heartbeat: bool = True, # Automatic heartbeat registration (RFC-0016)
heartbeat_interval: int = 30, # Seconds between heartbeats
drain_timeout: int = 60, # Seconds to wait during graceful shutdown
config: AgentConfig = None, # Full config object (overrides individual args)
)
Preferred pattern: LLM-Powered Agents
Adding model= to @Agent is the recommended way to build agents. It gives the agent an LLM brain with an agentic tool loop (self.think()), streaming (self.think_stream()), and protocol-native tools — all with zero extra wiring. See the LLM-Powered Agents guide for full details.
@Agent("research-agent")
class ResearchAgent:
@on_assignment
async def handle_new_intent(self, intent):
"""Called when assigned to a new intent."""
return {"status": "researching"} # Auto-patches state
@on_state_change(keys=["data"])
async def on_data_ready(self, intent, old_state, new_state):
"""Called when 'data' key changes in state."""
analysis = analyze(new_state["data"])
return {"analysis": analysis}
@on_complete
async def handle_completion(self, intent):
"""Called when intent is completed."""
print(f"Intent {intent.id} completed!")
if __name__ == "__main__":
ResearchAgent.run()
Agent with Memory and Tools#
The @Agent decorator accepts configuration that the framework manages automatically:
from openintent.agents import Agent, on_assignment, on_task
@Agent("analyst",
memory="episodic", # RFC-0015: auto-configured memory tier
tools=["web_search", "sql"], # RFC-0014: scoped tool access
capabilities=["nlp", "sql"], # RFC-0016: registered capabilities
auto_heartbeat=True, # RFC-0016: automatic health pings
)
class AnalystAgent:
@on_assignment
async def research(self, intent):
# Recall past findings from episodic memory
past = await self.memory.recall(tags=["research"])
findings = await do_research(intent.description, context=past)
# Store findings for future recall
await self.memory.store(
key=f"research-{intent.id}",
value=findings,
tags=["research", intent.title]
)
return {"findings": findings, "status": "analyzed"}
@on_task(status="completed")
async def on_subtask_done(self, intent, task):
"""Called when a subtask completes."""
return {"last_completed_task": task.title}
Lifecycle Decorators#
| Decorator | Trigger | RFC |
|---|---|---|
@on_assignment |
Agent assigned to intent | Core |
@on_complete |
Intent completed | Core |
@on_state_change(keys) |
State keys changed | Core |
@on_event(event_type) |
Specific event type | Core |
@on_lease_available(scope) |
Lease becomes available | 0003 |
@on_access_requested |
Access request received | 0011 |
@on_task(status) |
Task lifecycle event | 0012 |
@on_trigger(name) |
Trigger fires | 0017 |
@on_drain |
Graceful shutdown signal | 0016 |
@on_handoff |
Agent receives work delegated from another agent | 0013 |
@on_retry |
Intent reassigned after a previous failure | 0010 |
@on_all_complete |
All portfolio intents complete | 0007 |
Guardrail Decorators#
| Decorator | Trigger |
|---|---|
@input_guardrail |
Validate intent data before assignment handlers run |
@output_guardrail |
Validate handler results before they are committed |
Raise GuardrailError to reject:
from openintent.agents import Agent, on_assignment, input_guardrail, GuardrailError
@Agent("safe-agent")
class SafeAgent:
@input_guardrail
async def validate(self, intent):
if intent.state.get("risk_score", 0) > 0.9:
raise GuardrailError("Risk score too high")
@on_assignment
async def work(self, intent):
return {"status": "processed"}
stateDiagram-v2
[*] --> Assigned: on_assignment
Assigned --> Working: Process intent
Working --> StateChanged: on_state_change
StateChanged --> Working: Continue
Working --> TaskDone: on_task("completed")
TaskDone --> Working: More tasks
Working --> Draining: on_drain
Working --> Completed: on_complete
Draining --> Completed: Finish work
Completed --> [*]
Memory Access (RFC-0015)#
Agents configured with memory= get a natural self.memory proxy:
@Agent("note-taker", memory="episodic")
class NoteTaker:
@on_assignment
async def work(self, intent):
# Store structured data with tags
await self.memory.store("key", {"data": "value"}, tags=["notes"])
# Recall by tags
results = await self.memory.recall(tags=["notes"])
# Pin important memories to prevent LRU eviction
await self.memory.pin("key")
Three memory tiers
- Working — task-scoped, auto-archived on completion
- Episodic — agent-scoped, LRU eviction, supports pinning
- Semantic — shared across agents, namespace-level permissions
Task Decomposition (RFC-0012)#
Create and manage subtasks from within agent handlers:
@Agent("planner", memory="working")
class PlannerAgent:
@on_assignment
async def plan(self, intent):
await self.tasks.create(
title="Research phase",
parent_intent_id=intent.id,
assign_to="researcher"
)
await self.tasks.create(
title="Analysis phase",
parent_intent_id=intent.id,
depends_on=["research-phase"],
assign_to="analyst"
)
return {"status": "planning", "tasks_created": 2}
Tool Access (RFC-0014)#
Agents configured with tools= get scoped tool access via self.tools:
@Agent("data-agent", tools=["web_search", "sql_query"])
class DataAgent:
@on_assignment
async def work(self, intent):
results = await self.tools.invoke("web_search", query=intent.description)
return {"search_results": results}
Tool scoping
Tool grants are scoped per-agent. An agent can only invoke tools it has been explicitly granted access to. Grants support expiry, rate limits, and cascading revocation.
Protocol Decorators#
First-class declarative configuration for protocol features. Import from openintent.agents:
Declare task decomposition strategy:
Declare credential vault requirements:
Declare memory tier configuration:
@Coordinator Decorator#
Multi-agent orchestration with governance features:
@Coordinator(
coordinator_id: str, # Unique identifier for this coordinator
model: str = None, # LLM model — enables self.think() for planning & delegation
agents: list = None, # Agent IDs managed by this coordinator
strategy: str = "sequential", # "sequential", "parallel", "adaptive"
guardrails: list = None, # Guardrail rules applied to all delegated work
memory: str = "episodic", # Memory tier: "working", "episodic", or "semantic"
tools: list = None, # ToolDef objects or RFC-0014 grant names
capabilities: list = None, # Declared capabilities for discovery
auto_heartbeat: bool = True, # Automatic heartbeat registration
config: AgentConfig = None, # Full config object (overrides individual args)
)
Built-in Coordinator Guardrails#
The guardrails= parameter on @Coordinator accepts these built-in policies:
| Policy | Description |
|---|---|
"require_approval" |
Logs decision records before assignment |
"budget_limit" |
Rejects intents exceeding cost constraints |
"agent_allowlist" |
Rejects delegation to agents outside the managed list |
LLM-Powered Coordinator#
Add model= and the coordinator can autonomously plan, delegate, and make decisions:
from openintent import Coordinator, on_assignment
@Coordinator(
"project-lead",
model="claude-sonnet-4-20250514",
agents=["researcher", "writer", "reviewer"],
memory="episodic",
guardrails=["require_approval"],
)
class ProjectLead:
@on_assignment
async def plan(self, intent):
return await self.think(
f"Break down this project and delegate to your team: "
f"{intent.description}"
)
See LLM-Powered Agents for full coordinator LLM details.
Manual Coordinator Example#
from openintent.agents import (
Coordinator, on_conflict, on_escalation, on_quorum
)
@Coordinator("team-lead",
agents=["agent-a", "agent-b"],
strategy="parallel",
guardrails=["require_approval"],
)
class TeamCoordinator:
@on_conflict
async def handle_conflict(self, intent, conflict):
"""Called on version conflicts."""
await self.record_decision(
decision_type="conflict_resolution",
summary=f"Resolved conflict on {intent.id}",
rationale="Latest write wins"
)
@on_escalation
async def handle_escalation(self, intent, source_agent):
"""Called when an agent escalates."""
await self.delegate(intent.title, agents=["senior-agent"])
@on_quorum(threshold=0.6)
async def on_vote_reached(self, intent, votes):
"""Called when 60% of agents agree."""
await self.record_decision(
decision_type="quorum",
summary="Consensus reached",
rationale=f"{len(votes)} votes in favor"
)
Coordinator Lifecycle Decorators#
| Decorator | Trigger |
|---|---|
@on_conflict |
Version conflict detected |
@on_escalation |
Agent escalation received |
@on_quorum(threshold) |
Voting threshold met |
Coordinator Methods#
| Method | Description |
|---|---|
self.think(prompt) |
Agentic tool loop — sends prompt to LLM, executes tool calls, returns final text (requires model=) |
self.think_stream(prompt) |
Same agentic loop but yields tokens as they arrive (requires model=) |
self.delegate(title, agents) |
Delegate work to agents |
self.record_decision(...) |
Record governance decision |
self.decisions |
Access decision audit log |