Events

The Event system lets your application react to what's happening inside the framework. Subscribe to events like progress updates, agent status changes, errors, and checkpoints.

Event types

Events are organized by category:

Lifecycle

STARTEDCOMPLETEDFAILEDPAUSEDRESUMEDCANCELLED

Progress

PROGRESSPHASE_STARTEDPHASE_COMPLETED

Agent

AGENT_STARTEDAGENT_COMPLETEDAGENT_FAILEDAGENT_MESSAGE

Task

TASK_STARTEDTASK_COMPLETEDTASK_FAILED

Checkpoint

CHECKPOINTCHECKPOINT_APPROVEDCHECKPOINT_REJECTED

Error & Recovery

ERRORBUG_DETECTEDDISCUSSION_STARTEDFIX_STARTED

User Interaction

NEED_INPUTUSER_FEEDBACK

Thinking

THINKING

Event model

python
Event(
    type: EventType,          # One of the types above
    source: str = "",         # Agent name or "framework"
    message: str = "",        # Human-readable description
    data: Optional[dict],     # Extra structured data
    timestamp: float,         # Auto-set
)

Subscribing to events

events.py
from veska import EventEmitter, EventType

# The orchestrator has a built-in event emitter
orchestrator.events.on(EventType.PROGRESS, on_progress)
orchestrator.events.on(EventType.AGENT_STARTED, on_agent_start)
orchestrator.events.on(EventType.ERROR, on_error)

# Catch all events
orchestrator.events.on_any(log_everything)

# Handlers can be sync or async
async def on_progress(event):
    data = event.data
    print(f"Progress: {data['completed']}/{data['total']} — {data.get('current_task', '')}")

def on_agent_start(event):
    print(f"Agent {event.source} started working")

def on_error(event):
    print(f"ERROR from {event.source}: {event.message}")

Emitting events

The framework emits events automatically, but you can also emit custom events:

emit.py
from veska import Event, EventType

# Emit a custom event
await emitter.emit(Event(
    type=EventType.PROGRESS,
    source="my-agent",
    message="Processing item 5 of 10",
    data={"completed": 5, "total": 10},
))

# Convenience methods
await emitter.emit_progress("my-agent", completed=5, total=10)
await emitter.emit_error("my-agent", "Connection timeout")
await emitter.emit_agent_status("researcher", EventType.AGENT_COMPLETED, "Finished research")

Checkpoints

Checkpoints pause execution and wait for user approval before continuing. This is useful for review steps or critical decisions.

checkpoint.py
# In the orchestrator workflow — pauses and waits
response = await orchestrator.events.checkpoint(
    checkpoint_id="plan-review",
    title="Review execution plan",
    description="The orchestrator has created a plan with 5 tasks.",
    details={"tasks": plan_summary},
)

# In your application — approve or reject
if user_approves:
    orchestrator.events.approve_checkpoint("plan-review", feedback="Looks good")
else:
    orchestrator.events.reject_checkpoint("plan-review", feedback="Add more research")

User input

input.py
# Request input from user (from inside framework)
await emitter.request_input("What database should we use?", source="orchestrator")

# Send user's response (from your app)
await emitter.send_feedback("Use PostgreSQL")

Control flow

control.py
# Pause execution
await orchestrator.events.pause()

# Resume
await orchestrator.events.resume()

# Cancel everything
await orchestrator.events.cancel()

# Query event history
history = orchestrator.events.get_history(
    event_type=EventType.ERROR,
    source="researcher",
    limit=10,
)

# Stats
print(orchestrator.events.stats)
# {"total_events": 42, "handlers_registered": 5, "pending_checkpoints": 0}