Skip to content

SSE Subscriptions & Real-Time Streaming#

Subscribe to protocol events in real time via Server-Sent Events.

Subscribe to Intent Events#

from openintent import OpenIntentClient

client = OpenIntentClient(
    base_url="http://localhost:8000",
    agent_id="watcher"
)

# SSE stream — blocks and yields events as they arrive
for event in client.subscribe(intent_id):
    print(f"[{event.timestamp}] {event.event_type}")
    print(f"  Actor: {event.actor}")
    print(f"  Payload: {event.payload}")

    if event.event_type == "status_changed":
        new_status = event.payload.get("new")
        if new_status == "completed":
            break

Subscribe to Agent Events#

Monitor all activity for a specific agent:

for event in client.subscribe_agent("research-agent"):
    if event.event_type == "intent_assigned":
        print(f"New task: {event.payload.get('title')}")
    elif event.event_type == "lease_acquired":
        print(f"Lease acquired on {event.payload.get('intent_id')}")

Subscribe to Portfolio Events#

for event in client.subscribe_portfolio(portfolio_id):
    intent_id = event.intent_id
    print(f"[{event.event_type}] Intent {intent_id}")

    if event.event_type == "all_intents_completed":
        print("Portfolio finished!")
        break

Async Streaming#

from openintent import AsyncOpenIntentClient

async def watch_portfolio(portfolio_id):
    async with AsyncOpenIntentClient(
        base_url="http://localhost:8000",
        agent_id="async-watcher"
    ) as client:
        async for event in client.subscribe_portfolio(portfolio_id):
            print(f"Event: {event.event_type}")

            if event.event_type == "all_intents_completed":
                break

Event Filtering#

from openintent.models import EventType

for event in client.subscribe(intent_id):
    match event.event_type:
        case EventType.STATE_PATCHED:
            handle_state_change(event)
        case EventType.LLM_REQUEST_COMPLETED:
            track_cost(event)
        case EventType.LEASE_EXPIRED:
            handle_lease_expiry(event)
        case EventType.ATTACHMENT_ADDED:
            process_attachment(event)

LLM Streaming with Protocol Logging#

Stream LLM responses while automatically logging to the intent event log:

from openai import OpenAI
from openintent.adapters import OpenAIAdapter, AdapterConfig

config = AdapterConfig(
    on_stream_start=lambda: print("Generating..."),
    on_token=lambda t: print(t, end="", flush=True),
    on_stream_end=lambda u: print(f"\n[{u['total_tokens']} tokens]"),
    on_stream_error=lambda e: print(f"\nError: {e}")
)

adapter = OpenAIAdapter(OpenAI(), client, intent.id, config=config)

for chunk in adapter.chat_complete_stream(
    model="gpt-4",
    messages=[{"role": "user", "content": "Explain distributed systems"}]
):
    pass  # Hooks handle output

# Event log now contains the full LLM interaction with cost data

Building a Dashboard#

import asyncio
from openintent import AsyncOpenIntentClient

async def dashboard(portfolio_ids):
    async with AsyncOpenIntentClient(
        base_url="http://localhost:8000",
        agent_id="dashboard"
    ) as client:
        tasks = [
            watch_portfolio(client, pid)
            for pid in portfolio_ids
        ]
        await asyncio.gather(*tasks)

async def watch_portfolio(client, portfolio_id):
    async for event in client.subscribe_portfolio(portfolio_id):
        print(f"[{portfolio_id[:8]}] {event.event_type}: {event.payload}")

asyncio.run(dashboard(["portfolio-1", "portfolio-2"]))