Message Bus
The Message Bus enables agent-to-agent communication. Agents can send direct messages, broadcast to all agents, or have request-response conversations.
Message types
Every message has a type that describes its purpose:
| Type | Description |
|---|---|
REQUEST | Ask another agent to do something |
RESPONSE | Reply to a request |
ERROR | Report an error |
BROADCAST | Send to all agents |
TASK_ASSIGN | Assign a task to an agent |
TASK_RESULT | Return task results |
FIX_REQUEST | Ask an agent to fix something |
FEEDBACK | Provide feedback on work |
BusMessage
Every message on the bus is a BusMessage with these fields:
python
BusMessage(
id: str, # Auto-generated UUID
from_agent: str, # Sender name
to_agent: str, # Receiver name ("*" for broadcast)
type: MessageType, # One of the types above
content: str, # Message text
data: Optional[dict], # Extra structured data
reply_to: Optional[str], # Message ID being replied to
timestamp: float, # Auto-set
)Subscribing to messages
Agents subscribe to the bus to receive messages. The handler is called whenever a message is sent to that agent.
subscribe.py
from veska import MessageBus, BusMessage, MessageType
bus = MessageBus()
# Subscribe an agent
async def researcher_handler(message: BusMessage):
print(f"Researcher received: {message.content}")
if message.type == MessageType.REQUEST:
# Send a response back
bus.respond(message, "researcher", "Here are my findings...")
bus.subscribe("researcher", researcher_handler)
bus.subscribe("writer", writer_handler)Sending messages
sending.py
# Direct request to a specific agent
bus.request("orchestrator", "researcher", "Research AI agents")
# Broadcast to all agents
bus.broadcast("orchestrator", "Project starting now")
# Respond to a message
bus.respond(original_message, "researcher", "Done!", data={"sources": 12})Watchers
Watchers see all messages on the bus, regardless of recipient. The Orchestrator uses this to monitor agent communication.
watcher.py
async def monitor(message: BusMessage):
print(f"[{message.from_agent} → {message.to_agent}] {message.content}")
bus.add_watcher(monitor)Query history
history.py
# Get all messages from an agent
history = bus.get_history(agent="researcher", limit=20)
# Get conversation between two agents
convo = bus.get_conversation("researcher", "writer")
# Bus statistics
print(bus.stats)
# {"sent": 45, "delivered": 42, "pending": 3, "subscribers": 4, ...}