agent-fleet/docs/architecture.md
Zer4tul a18cb2824e fix: agent capability matching in dispatch — only agent: labels are requirements
Previous bug: only code:* and review labels were checked, so agent:document,
agent:tests etc. were never filtered. Any agent could pick up any task.

Now: labels with agent: prefix are matched against agent capabilities.
Other labels are treated as metadata. Includes regression test.
2026-05-12 23:51:08 +08:00

18 KiB

Agent Fleet Architecture

This document describes the internal architecture of Agent Fleet, including the dual execution model, dispatch loop, task lifecycle, Forgejo integration flow, and state machine.

System Overview

Agent Fleet is an orchestrator for coordinating AI agents across distributed environments. It acts as the central hub that receives tasks from Forgejo, dispatches them to agents, and tracks their completion.

Core Components

┌─────────────────────────────────────────────────────────────────┐
│                      Agent Fleet Orchestrator                   │
├─────────────────────────────────────────────────────────────────┤
│                                                               │
│  ┌──────────────┐     ┌──────────────┐                    │
│  │  HTTP API    │◄────┤   Axum       │                    │
│  │  Handlers    │     │   Router     │                    │
│  └──────┬───────┘     └──────────────┘                    │
│         │                                                   │
│         │                                                   │
│  ┌──────▼──────────────┐  ┌──────────────────┐             │
│  │  State Machine     │  │  Event Store    │             │
│  │  (transitions)     │◄─┤  (SQLite)       │             │
│  └───────────────────┘  └──────────────────┘             │
│         │                                                   │
│         │                                                   │
│  ┌──────▼──────────────┐  ┌──────────────────┐             │
│  │  Task Queue       │  │  Timeout       │             │
│  │  (http_pull)     │  │  Checker       │             │
│  └───────────────────┘  └──────────────────┘             │
│         │                                                   │
│         │                                                   │
│  ┌──────▼──────────────┐  ┌──────────────────┐             │
│  │  Dispatcher       │  │  Heartbeat     │             │
│  │  Loop            │  │  Checker       │             │
│  │  (ssh_cli)       │  └──────────────────┘             │
│  └───────────────────┘                                 │
│         │                                                   │
└─────────┼───────────────────────────────────────────────────────┘
          │
          │
    ┌─────┴────────────────────────────┐
    │                              │
    ▼                              ▼
┌────────────────┐          ┌────────────────┐
│  Forgejo     │          │  Agents       │
│  (webhooks)  │          │  (ssh_cli &   │
│              │          │   http_pull)  │
└────────────────┘          └────────────────┘

Dual Execution Model

Agent Fleet supports two fundamentally different execution modes, each suited for different deployment scenarios.

ssh_cli Mode

In ssh_cli mode, the orchestrator initiates task execution by:

  1. SSH-ing into a configured host
  2. Executing an agent CLI binary with a structured prompt
  3. Parsing the JSON output to determine task result

Flow:

Dispatcher Loop
     │
     ▼
Select task (status=created, execution_mode=ssh_cli)
     │
     ▼
Select host (capability match + lowest load)
     │
     ▼
Transition: created → assigned → running
     │
     ▼
SshExecutor executes CLI
     │
     ├─► Success → Parse receipt → completed/review_pending
     │
     └─► Failure → failed

Characteristics:

  • Initiator: Orchestrator
  • Communication: SSH or local subprocess
  • Control Flow: Orchestrator-managed
  • Best For: CLI-based agents (codex-cli, claude-code)

Implementation Details:

  • src/dispatch.rs - Periodic dispatch loop
  • src/execution/mod.rs - SSH execution
  • src/adapters/mod.rs - CLI adapter configuration

http_pull Mode

In http_pull mode, agents independently poll the orchestrator for work using the HTTP API.

Flow:

Agent Loop
     │
     ▼
Heartbeat: POST /api/v1/agents/heartbeat
     │
     ▼
Dequeue: POST /api/v1/tasks/dequeue
     │
     ├─► No task → Wait, retry
     │
     └─► Got task
          │
          ▼
Update status: running
          │
          ▼
Execute task
          │
          ▼
Complete: POST /api/v1/tasks/{task_id}/complete
          │
          └─► Receipt validated → completed/failed

Characteristics:

  • Initiator: Agent
  • Communication: HTTP REST API
  • Control Flow: Agent-managed
  • Best For: Self-scheduled agents (OpenClaw, Hermes, custom bot frameworks)

Implementation Details:

  • src/core/task_queue.rs - HTTP pull task queue
  • src/api.rs - Dequeue and status update endpoints

Mode Comparison

Aspect ssh_cli http_pull
Who initiates? Orchestrator Agent
Communication SSH/Subprocess HTTP REST API
Configuration [[hosts]] in config.toml Agent-side registration
Network topology Orchestrator reaches agents Agents reach orchestrator
Firewalls Requires outbound SSH from orchestrator Requires inbound HTTP to orchestrator
Latency Orchestrator-driven pull Agent-driven pull
Failure detection Process exit code Heartbeat timeout

Dispatch Loop

The dispatch loop (src/dispatch.rs) runs on a configurable interval (default: 10 seconds) and is responsible for assigning ssh_cli tasks to available hosts.

Algorithm

1. Fetch all tasks where:
   - status = 'created'
   - execution_mode = 'ssh_cli'

2. For each task:
   a. Find available hosts matching task labels (capabilities)
   b. Filter hosts where current_load < max_concurrency
   c. Sort by current_load (lowest first)
   d. Select first host (if any)

3. If host selected:
   a. Transition: created → assigned
   b. Transition: assigned → running
   c. Execute via SshExecutor
   d. Parse receipt and transition to final state

4. Process review_pending tasks:
   a. If review_count > max_retries, transition to failed

Host Selection

Host selection uses a combination of:

  1. Capability matching: Task labels must be subset of agent capabilities
  2. Load balancing: Choose host with lowest current task count
  3. Concurrency limits: Respect max_concurrency per agent type
// From src/dispatch.rs:select_host
for host in &config.hosts {
    for agent in &host.agents {
        // Check capability match
        let supports_caps = task.labels.iter().all(|label| {
            !label.starts_with("code:") && !label.starts_with("review")
                || agent.capabilities.iter().any(|cap| cap == label)
        });

        // Check concurrency
        let current = *load.get(&(host.host_id, agent.agent_type)).unwrap_or(&0);
        if supports_caps && current < agent.max_concurrency {
            candidates.push((host, agent, current));
        }
    }
}
// Sort by load and pick first
candidates.sort_by_key(|(_, _, current)| *current);

Task Lifecycle

A task progresses through a finite state machine from creation to completion or failure.

State Machine

                    ┌─────────┐
                    │ Created │
                    └────┬────┘
                         │
                         │ (dispatch)
                         ▼
                  ┌──────────────┐
                  │   Assigned   │
                  └──────┬───────┘
                         │
                         │ (start)
                         ▼
                  ┌──────────────┐
                  │   Running    │
                  └──────┬───────┘
                         │
              ┌────────┴────────┐
              │                 │
         (partial)        (complete)
              │                 │
              ▼                 ▼
    ┌──────────────┐   ┌───────────┐
    │Review Pending │   │ Completed │
    └──────┬───────┘   └───────────┘
           │
           │ (review loop)
           ▼
    ┌──────────────┐
    │   Failed     │
    └──────┬───────┘
           │
           │ (retry)
           ▼
    ┌──────────────┐
    │   Assigned   │
    └──────────────┘

State Definitions

State Description Triggers
created Initial state, task exists but not assigned Forgejo webhook, manual insertion
assigned Task has been assigned to an agent/host Dispatch loop, dequeue
running Agent is actively working on the task Agent start, SSH execution start
review_pending Agent completed work, awaiting review/approval Partial receipt, PR opened
completed Task successfully finished Full receipt, PR merged
failed Task could not be completed Error, timeout, retry limit
agent_lost Agent stopped responding during task Heartbeat timeout
cancelled Task was cancelled Manual cancellation

Valid Transitions

Implemented in src/core/state_machine.rs:validate_transition():

Created  Assigned, Cancelled
Assigned  Running, Cancelled
Running  ReviewPending, Completed, Failed, AgentLost, Cancelled
ReviewPending  Assigned, Running, Completed, Failed, Cancelled
Failed  Assigned, Cancelled
AgentLost  Assigned, Cancelled
Completed  (terminal)
Cancelled  (terminal)

Timeout and Retry

  • Task Timeout: The TimeoutChecker (src/core/timeout.rs) monitors tasks and marks them failed if they exceed task_timeout_secs
  • Agent Timeout: The HeartbeatChecker (src/api.rs) monitors agent heartbeats and marks them offline if heartbeat_interval_secs * heartbeat_timeout_threshold elapses without a heartbeat
  • Auto-Retry: Tasks in failed or agent_lost can be retried via API, transitioning back to assigned

Forgejo Integration

The Forgejo integration handles the bi-directional flow between Agent Fleet and Forgejo.

Webhook Events

Forgejo sends webhook events to POST /api/v1/webhooks/forgejo:

Event Type Source Action
issues (opened) Issue created with agent:* label Create new task
pull_request (opened) PR created on task/* branch Mark task review_pending
pull_request (closed, merged=true) PR merged Mark task completed
push Commit to task/* branch Update last_activity_at

Task Creation Flow

Forgejo Issue (with agent:code label)
         │
         ▼
   POST /api/v1/webhooks/forgejo
         │
         ▼
   Verify HMAC signature
         │
         ▼
   Parse event
         │
         ▼
   Extract:
   - task_type from agent:* label
   - priority from priority:* label
   - requirements from issue title + body
         │
         ▼
   Create task with:
   - task_id = "org/repo#42"
   - source = "forgejo:org/repo#42"
   - execution_mode = "ssh_cli"
   - branch_name = "task/org%2Frepo%2342"
   - pr_title = "feat: Title (#42)"
         │
         ▼
   StateMachine: create_task()
         │
         ▼
   Store in EventStore

PR Lifecycle Flow

PR opened on task/* branch
         │
         ▼
   POST /api/v1/webhooks/forgejo (pull_request event)
         │
         ▼
   Extract task_id from branch name
         │
         ▼
   StateMachine: transition(Running → ReviewPending)
         │
         ▼
   Update Forgejo issue label to "status:doing"
PR merged
         │
         ▼
   POST /api/v1/webhooks/forgejo (pull_request event, merged=true)
         │
         ▼
   Extract task_id from branch name
         │
         ▼
   StateMachine: transition(Running/ReviewPending → Completed)
         │
         ▼
   Update Forgejo issue label to "status:done"
         │
         ▼
   Auto-generate receipt comment

Branch Naming Convention

Task branches follow the pattern:

task/{url_encoded_task_id}

Where task_id is org/repo#42, the branch becomes:

task/org%2Frepo%2342

This encoding allows the branch name to be safely used in Git while preserving the original task identifier.

Label Conventions

Label Prefix Usage Example
agent: Task type agent:code, agent:review
code: Code capability requirement code:rust, code:python
priority: Priority level priority:urgent, priority:high, priority:low
status: Current status (managed by system) status:todo, status:doing, status:done

Signature Verification

Webhook signatures are verified using HMAC-SHA256:

// From src/integrations/forgejo.rs
pub fn verify_webhook_signature(secret: &str, body: &[u8], signature: &str) -> Result<(), ForgejoError> {
    let provided = signature.trim();
    let provided = provided.strip_prefix("sha256=").unwrap_or(provided);
    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())?;
    mac.update(body);
    let expected = hex::encode(mac.finalize().into_bytes());
    if expected == provided { Ok(()) } else { Err(ForgejoError::InvalidSignature) }
}

Event Sourcing

Agent Fleet uses an event-sourced architecture for task state management.

Event Store

The EventStore (src/core/event_store.rs) provides:

  • Persistent storage in SQLite
  • Event journaling for all state changes
  • Task snapshot projection
  • Agent registry

Event Schema

Each state transition creates a TaskEvent:

pub struct TaskEvent {
    pub event_id: String,
    pub task_id: String,
    pub event_type: String,  // e.g., "task.created", "task.assigned"
    pub agent_id: Option<String>,
    pub timestamp: DateTime<Utc>,
    pub payload: serde_json::Value,
}

Benefits

  1. Audit Trail: Complete history of all state changes
  2. Reproducibility: Can reconstruct task state from events
  3. Debugging: Timeline of all transitions for troubleshooting
  4. Integrations: Event notifications can be sent to external systems

Background Services

Agent Fleet runs several background services concurrently:

Service File Function Interval
Dispatcher Loop src/dispatch.rs Assign ssh_cli tasks to hosts dispatch_interval_secs
Timeout Checker src/core/timeout.rs Detect task timeouts 30 seconds
Heartbeat Checker src/api.rs Detect offline agents heartbeat_interval_secs

All background services run as Tokio tasks in src/main.rs:

#[tokio::main]
async fn main() {
    // ... setup ...

    // Spawn background services
    tokio::spawn(async move { timeout_checker.run().await });
    tokio::spawn(async move { heartbeat_checker.run().await });
    tokio::spawn(async move { dispatcher.run().await });

    // Start HTTP server
    axum::serve(listener, app).await?;
}

Error Handling

Agent Fleet uses structured error handling throughout:

#[derive(Debug, thiserror::Error)]
pub enum ApiError {
    #[error("database error: {0}")]
    Database(#[from] rusqlite::Error),
    #[error("not found: {0}")]
    NotFound(String),
    #[error("bad request: {0}")]
    BadRequest(String),
    #[error("unauthorized: {0}")]
    Unauthorized(String),
    #[error("forgejo error: {0}")]
    Forgejo(#[from] ForgejoError),
}

All errors are converted to appropriate HTTP status codes:

  • 400 Bad Request - Invalid input, bad state transition
  • 401 Unauthorized - Missing/invalid auth token or webhook signature
  • 404 Not Found - Task or agent not found
  • 500 Internal Server Error - Database errors, unexpected failures

Concurrency Model

Agent Fleet uses Arc<Mutex<T>> for shared state:

// Shared event store across all handlers and background services
let store = Arc::new(Mutex::new(event_store));

// State machine gets a reference
let state_machine = Arc::new(StateMachine::new(store.clone()));

// Background services take clones
tokio::spawn(async move { dispatcher.run().await });

This ensures:

  • Thread-safe access to shared state
  • Background services don't block API handlers
  • All state transitions are serialized through the mutex