Task Planner

The Task Planner manages task dependencies and execution order. It builds a dependency graph, identifies which tasks can run in parallel, and tracks progress through the plan.

Task status

StatusDescription
WAITINGDependencies not yet met
READYAll dependencies met, ready to run
RUNNINGCurrently being executed
DONECompleted successfully
FAILEDFailed with error
CANCELLEDCancelled by orchestrator
RETRYINGFailed and being retried

Task model

python
Task(
    id: str,                      # Auto-generated (task_XXXXXXXX)
    name: str,                    # Human-readable name
    description: str = "",        # What this task does
    agent: str,                   # Which agent handles this
    depends_on: list[str] = [],   # Task IDs this depends on
    status: TaskStatus = WAITING,
    result: Optional[str] = None,
    error: Optional[str] = None,
    priority: int = 0,            # Higher = run first
    retries: int = 0,
    max_retries: int = 2,
)

# Properties
task.duration      # Time from start to completion (seconds)
task.is_terminal   # True if DONE, FAILED, or CANCELLED
task.can_retry     # True if failed and retries < max_retries

Creating a plan

plan.py
from veska import TaskPlanner, Task

planner = TaskPlanner()

# Add tasks with dependencies
research = Task(name="Research", agent="researcher", description="Gather sources")
planner.add_task(research)

analyze = Task(
    name="Analyze",
    agent="researcher",
    description="Analyze findings",
    depends_on=[research.id],  # Runs after research
)
planner.add_task(analyze)

write = Task(
    name="Write",
    agent="writer",
    description="Write the report",
    depends_on=[analyze.id],   # Runs after analysis
)
planner.add_task(write)

# Or add multiple at once
planner.add_tasks([research, analyze, write])

Parallel execution

Tasks without dependencies on each other run in parallel. Use get_execution_order() to see the waves of parallel tasks:

parallel.py
# Independent tasks run in parallel
task_a = Task(name="Research AI", agent="researcher")
task_b = Task(name="Research ML", agent="researcher")
task_c = Task(name="Write Report", agent="writer", depends_on=[task_a.id, task_b.id])

planner.add_tasks([task_a, task_b, task_c])

# Get execution waves
waves = planner.get_execution_order()
# Wave 1: [task_a.id, task_b.id]  — run in parallel
# Wave 2: [task_c.id]             — runs after both complete

Running tasks

running.py
# Get tasks that are ready to run
ready = planner.get_ready_tasks()

for task in ready:
    planner.start_task(task.id)      # Mark as RUNNING

    try:
        result = await run_agent(task)
        planner.complete_task(task.id, result)   # Mark as DONE
    except Exception as e:
        planner.fail_task(task.id, str(e))       # Mark as FAILED

        if task.can_retry:
            planner.retry_task(task.id)           # Mark as RETRYING → READY

Phases

Group tasks into named phases for organization and progress tracking:

phases.py
# Create phases
planner.add_phase("research", [task_a.id, task_b.id], "Gather all sources")
planner.add_phase("writing", [task_c.id], "Write the final report")

# Check phase status
current = planner.get_current_phase()
print(f"Currently in: {current.name}")

if planner.is_phase_complete("research"):
    print("Research phase done!")

Validation & progress

validation.py
# Validate the plan (check for cycles, missing deps)
errors = planner.validate()
if errors:
    for error in errors:
        print(f"Plan error: {error}")

# Track progress
print(planner.progress)
# {"completed": 2, "running": 1, "failed": 0, "waiting": 3, "total": 6, "percentage": 33.3}

print(planner.is_complete)    # True when all tasks done
print(planner.has_failures)   # True if any task failed