Triggers & Reactive Scheduling#
Triggers are standing declarations that automatically create intents when conditions are met — schedule (cron), event (protocol-reactive), or webhook (external HTTP).
Schedule Trigger (Cron)#
from openintent import OpenIntentClient
client = OpenIntentClient(
base_url="http://localhost:8000",
agent_id="scheduler"
)
# Create a cron-based trigger
client.triggers.create(
name="daily_report",
type="schedule",
schedule="0 9 * * *", # Every day at 9 AM
intent_template={
"title": "Daily Status Report",
"assign": "reporter",
"initial_state": {"report_type": "daily"}
}
)
Event Trigger (Protocol-Reactive)#
React to protocol events and create new intents automatically:
# Trigger when any intent completes
client.triggers.create(
name="post_completion_review",
type="event",
event_type="intent_completed",
filter={"namespace": "production"},
intent_template={
"title": "Review: {{ source_intent.title }}",
"assign": "reviewer",
"initial_state": {
"source_intent_id": "{{ source_intent.id }}",
"completed_at": "{{ event.timestamp }}"
}
}
)
# Trigger on agent health change
client.triggers.create(
name="agent_failure_alert",
type="event",
event_type="agent_status_changed",
filter={"new_status": "dead"},
intent_template={
"title": "Alert: Agent {{ agent.id }} is dead",
"assign": "ops-agent",
"initial_state": {
"failed_agent": "{{ agent.id }}",
"last_heartbeat": "{{ agent.last_heartbeat }}"
}
}
)
Webhook Trigger (External HTTP)#
Accept external events via HTTP and create intents:
# Create a webhook trigger
webhook = client.triggers.create(
name="github_push",
type="webhook",
payload_transform={
"title": "Build: {{ payload.repository.name }}",
"assign": "ci-agent",
"initial_state": {
"repo": "{{ payload.repository.full_name }}",
"branch": "{{ payload.ref }}",
"commit": "{{ payload.head_commit.id }}"
}
}
)
print(f"Webhook URL: {webhook.url}")
# Configure this URL in GitHub webhook settings
Declarative Triggers with @Trigger#
from openintent.agents import Agent, on_assignment, Trigger
@Trigger
class DailyCleanup:
type = "schedule"
schedule = "0 2 * * *" # 2 AM daily
intent_template = {
"title": "Nightly Cleanup",
"assign": "janitor"
}
@Trigger
class OnFailure:
type = "event"
event_type = "intent_failed"
intent_template = {
"title": "Retry: {{ source_intent.title }}",
"assign": "{{ source_intent.assign }}"
}
deduplication = "skip" # Don't create duplicate retry intents
@Agent("janitor", auto_heartbeat=True)
class JanitorAgent:
@on_assignment
async def handle(self, intent):
await self.cleanup_old_data()
return {"status": "cleaned"}
Deduplication#
Control how triggers handle repeated firings:
client.triggers.create(
name="rate_limited_alert",
type="event",
event_type="budget_exceeded",
deduplication="skip", # "allow", "skip", or "queue"
intent_template={
"title": "Budget Alert",
"assign": "finance-agent"
}
)
Trigger Chains and Depth Limits#
Triggers can cascade (trigger A creates an intent that fires trigger B), with depth limits to prevent infinite loops:
# This trigger creates an intent that might fire another trigger
client.triggers.create(
name="cascading_review",
type="event",
event_type="intent_completed",
max_cascade_depth=3, # Prevent infinite loops
intent_template={
"title": "Review: {{ source_intent.title }}",
"assign": "reviewer"
}
)
YAML Workflow with Triggers#
openintent: "1.0"
info:
name: "Event-Driven Pipeline"
triggers:
- name: new_data_trigger
type: webhook
payload_transform:
title: "Process: {{ payload.dataset_name }}"
assign: processor
initial_state:
dataset_id: "{{ payload.dataset_id }}"
- name: nightly_report
type: schedule
schedule: "0 22 * * *"
intent_template:
title: "Nightly Summary"
assign: reporter
- name: failure_retry
type: event
event_type: intent_failed
deduplication: skip
max_cascade_depth: 2
intent_template:
title: "Retry: {{ source_intent.title }}"
assign: "{{ source_intent.assign }}"
workflow:
process:
title: "Process Dataset"
assign: processor
summarize:
title: "Generate Summary"
assign: reporter
depends_on: [process]