Events
The Event system lets your application react to what's happening inside the framework. Subscribe to events like progress updates, agent status changes, errors, and checkpoints.
Event types
Events are organized by category:
Lifecycle
STARTEDCOMPLETEDFAILEDPAUSEDRESUMEDCANCELLEDProgress
PROGRESSPHASE_STARTEDPHASE_COMPLETEDAgent
AGENT_STARTEDAGENT_COMPLETEDAGENT_FAILEDAGENT_MESSAGETask
TASK_STARTEDTASK_COMPLETEDTASK_FAILEDCheckpoint
CHECKPOINTCHECKPOINT_APPROVEDCHECKPOINT_REJECTEDError & Recovery
ERRORBUG_DETECTEDDISCUSSION_STARTEDFIX_STARTEDUser Interaction
NEED_INPUTUSER_FEEDBACKThinking
THINKINGEvent model
python
Event(
type: EventType, # One of the types above
source: str = "", # Agent name or "framework"
message: str = "", # Human-readable description
data: Optional[dict], # Extra structured data
timestamp: float, # Auto-set
)Subscribing to events
events.py
from veska import EventEmitter, EventType
# The orchestrator has a built-in event emitter
orchestrator.events.on(EventType.PROGRESS, on_progress)
orchestrator.events.on(EventType.AGENT_STARTED, on_agent_start)
orchestrator.events.on(EventType.ERROR, on_error)
# Catch all events
orchestrator.events.on_any(log_everything)
# Handlers can be sync or async
async def on_progress(event):
data = event.data
print(f"Progress: {data['completed']}/{data['total']} — {data.get('current_task', '')}")
def on_agent_start(event):
print(f"Agent {event.source} started working")
def on_error(event):
print(f"ERROR from {event.source}: {event.message}")Emitting events
The framework emits events automatically, but you can also emit custom events:
emit.py
from veska import Event, EventType
# Emit a custom event
await emitter.emit(Event(
type=EventType.PROGRESS,
source="my-agent",
message="Processing item 5 of 10",
data={"completed": 5, "total": 10},
))
# Convenience methods
await emitter.emit_progress("my-agent", completed=5, total=10)
await emitter.emit_error("my-agent", "Connection timeout")
await emitter.emit_agent_status("researcher", EventType.AGENT_COMPLETED, "Finished research")Checkpoints
Checkpoints pause execution and wait for user approval before continuing. This is useful for review steps or critical decisions.
checkpoint.py
# In the orchestrator workflow — pauses and waits
response = await orchestrator.events.checkpoint(
checkpoint_id="plan-review",
title="Review execution plan",
description="The orchestrator has created a plan with 5 tasks.",
details={"tasks": plan_summary},
)
# In your application — approve or reject
if user_approves:
orchestrator.events.approve_checkpoint("plan-review", feedback="Looks good")
else:
orchestrator.events.reject_checkpoint("plan-review", feedback="Add more research")User input
input.py
# Request input from user (from inside framework)
await emitter.request_input("What database should we use?", source="orchestrator")
# Send user's response (from your app)
await emitter.send_feedback("Use PostgreSQL")Control flow
control.py
# Pause execution
await orchestrator.events.pause()
# Resume
await orchestrator.events.resume()
# Cancel everything
await orchestrator.events.cancel()
# Query event history
history = orchestrator.events.get_history(
event_type=EventType.ERROR,
source="researcher",
limit=10,
)
# Stats
print(orchestrator.events.stats)
# {"total_events": 42, "handlers_registered": 5, "pending_checkpoints": 0}