Message Bus

The Message Bus enables agent-to-agent communication. Agents can send direct messages, broadcast to all agents, or have request-response conversations.

Message types

Every message has a type that describes its purpose:

TypeDescription
REQUESTAsk another agent to do something
RESPONSEReply to a request
ERRORReport an error
BROADCASTSend to all agents
TASK_ASSIGNAssign a task to an agent
TASK_RESULTReturn task results
FIX_REQUESTAsk an agent to fix something
FEEDBACKProvide feedback on work

BusMessage

Every message on the bus is a BusMessage with these fields:

python
BusMessage(
    id: str,                    # Auto-generated UUID
    from_agent: str,            # Sender name
    to_agent: str,              # Receiver name ("*" for broadcast)
    type: MessageType,          # One of the types above
    content: str,               # Message text
    data: Optional[dict],       # Extra structured data
    reply_to: Optional[str],    # Message ID being replied to
    timestamp: float,           # Auto-set
)

Subscribing to messages

Agents subscribe to the bus to receive messages. The handler is called whenever a message is sent to that agent.

subscribe.py
from veska import MessageBus, BusMessage, MessageType

bus = MessageBus()

# Subscribe an agent
async def researcher_handler(message: BusMessage):
    print(f"Researcher received: {message.content}")
    if message.type == MessageType.REQUEST:
        # Send a response back
        bus.respond(message, "researcher", "Here are my findings...")

bus.subscribe("researcher", researcher_handler)
bus.subscribe("writer", writer_handler)

Sending messages

sending.py
# Direct request to a specific agent
bus.request("orchestrator", "researcher", "Research AI agents")

# Broadcast to all agents
bus.broadcast("orchestrator", "Project starting now")

# Respond to a message
bus.respond(original_message, "researcher", "Done!", data={"sources": 12})

Watchers

Watchers see all messages on the bus, regardless of recipient. The Orchestrator uses this to monitor agent communication.

watcher.py
async def monitor(message: BusMessage):
    print(f"[{message.from_agent} → {message.to_agent}] {message.content}")

bus.add_watcher(monitor)

Query history

history.py
# Get all messages from an agent
history = bus.get_history(agent="researcher", limit=20)

# Get conversation between two agents
convo = bus.get_conversation("researcher", "writer")

# Bus statistics
print(bus.stats)
# {"sent": 45, "delivered": 42, "pending": 3, "subscribers": 4, ...}