diff --git a/docs/agent-api-reference.md b/docs/agent-api-reference.md index 78be745..1abe9ce 100644 --- a/docs/agent-api-reference.md +++ b/docs/agent-api-reference.md @@ -236,7 +236,7 @@ curl http://FLEET_API_URL:PORT/api/v1/tasks/org%2Frepo%2342 POST /api/v1/tasks/dequeue ``` -Requires Bearer token if `http_pull_token` is configured. Only returns tasks with `execution_mode = http_pull`. +Requires Bearer token if `http_pull_token` is configured. Returns tasks with `execution_mode = http_pull` and may also claim `undecided` tasks, atomically converting them to `http_pull` when dequeued. **Request:** @@ -258,6 +258,33 @@ curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/dequeue \ --- +### Assign Task + +``` +POST /api/v1/tasks/{task_id}/assign +``` + +Explicitly assign a created task to a specific agent or host. + +**Request:** + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| agent_id | string | yes | Registered http_pull agent ID or configured ssh_cli host ID | +| execution_mode | string | no | Optional override: `ssh_cli` or `http_pull`. If omitted, the server auto-detects it from the target. | + +**Response:** `200 OK` — updated [Task](#task-object). + +**Errors:** `404` if task or agent not found. `400` if task status is not assignable. + +```bash +curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/org%2Frepo%2342/assign \ + -H 'Content-Type: application/json' \ + -d '{"agent_id": "worker-03"}' +``` + +--- + ### Update Task Status (http_pull only) ``` @@ -455,7 +482,7 @@ Receives Forgejo webhook events. Requires HMAC-SHA256 signature header. **Priority values:** `low`, `normal`, `high`, `urgent` -**Execution mode values:** `ssh_cli`, `http_pull` +**Execution mode values:** `undecided`, `ssh_cli`, `http_pull` ### Receipt Object diff --git a/openspec/changes/dynamic-execution-mode/.openspec.yaml b/openspec/changes/dynamic-execution-mode/.openspec.yaml new file mode 100644 index 0000000..40cc12f --- /dev/null +++ b/openspec/changes/dynamic-execution-mode/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-12 diff --git a/openspec/changes/dynamic-execution-mode/design.md b/openspec/changes/dynamic-execution-mode/design.md new file mode 100644 index 0000000..f2f2cc7 --- /dev/null +++ b/openspec/changes/dynamic-execution-mode/design.md @@ -0,0 +1,75 @@ +## Context + +当前任务创建时硬编码 `execution_mode = SshCli`。这在只有 ssh_cli agent 时可行,但在混合环境(ssh_cli + http_pull)中会阻塞 http_pull agent 接任务。 + +核心洞察:**执行模式是调度决策,不是任务属性**。任务只描述"需要什么能力",由调度器决定"怎么执行"。 + +## Goals / Non-Goals + +**Goals:** +- 任务创建时不预设执行模式 +- Dispatch loop 根据注册的 agent 动态决定 +- Coordinator 可以显式指派任务给特定 agent +- http_pull agent 能 dequeue 到未被 ssh_cli 认领的任务 + +**Non-Goals:** +- 不实现智能调度(负载均衡、亲和性等)——Phase 2 再考虑 +- 不改变 receipt 验证流程 +- 不改变 Forgejo webhook 格式 + +## Decisions + +### Decision 1: ExecutionMode 新增 Undecided + +**选择**: 新增 `Undecided` 变体作为默认值 + +**理由**: +- 向后兼容:已有的 `SshCli`/`HttpPull` 任务不受影响 +- 语义清晰:`Undecided` 表示"等待调度器决定" +- dispatch loop 只处理 `Undecided` 任务,已决定的不再改变 + +**替代方案**: +- 用 `Option`(None 表示未决定)—— 语义等价但 enum 更明确 +- 去掉 execution_mode 字段,纯靠 runtime 状态—— 太激进,改太大 + +### Decision 2: 两阶段 dispatch + +**选择**: dispatch loop 分两阶段: + +1. **ssh_cli 阶段**:扫描 `Undecided` 任务,查找匹配的 ssh_cli host → 找到则标记 `SshCli` 并执行 +2. **http_pull 阶段**:剩余的 `Undecided` 任务标记为 `HttpPull`,等待 agent dequeue + +**理由**: +- ssh_cli 是主动调度(orchestrator 控制),优先级高于被动等待 +- http_pull agent 通过 dequeue 自行认领,不需要 orchestrator 主动分配 +- 两阶段简单清晰,不需要复杂的调度算法 + +### Decision 3: Coordinator 显式指派 + +**选择**: 新增 `POST /api/v1/tasks/{id}/assign` 端点 + +```json +{ + "agent_id": "hermes-worker-01", + "execution_mode": "http_pull" // optional, auto-detect if omitted +} +``` + +**理由**: +- coordinator(Jeeves)可能比自动调度更了解哪个 agent 适合 +- 支持跨任务指派(如"这个文档任务给 Hermes") +- 指派后任务不再是 `Undecided`,直接进入执行 + +### Decision 4: Dequeue 查询条件 + +**选择**: dequeue 查询改为 `execution_mode IN ('http_pull', 'undecided')` + +**理由**: +- 纯 http_pull 任务直接匹配 +- 被自动标记为 http_pull 的任务也能匹配 +- 如果调度器还没来得及处理 `Undecided`,agent 也能直接拉走(降级为 http_pull) + +## Risks / Trade-offs + +- **[竞争条件]** ssh_cli dispatch 和 http_pull dequeue 可能同时抢同一个 `Undecided` 任务 → 用 SQLite 事务保证原子性,dequeue 用 `UPDATE ... RETURNING` 原子操作 +- **[调度延迟]** Undecided 任务可能等一个 dispatch cycle 才被标记为 http_pull → dequeue 直接查 Undecided 可以缓解 diff --git a/openspec/changes/dynamic-execution-mode/proposal.md b/openspec/changes/dynamic-execution-mode/proposal.md new file mode 100644 index 0000000..9996923 --- /dev/null +++ b/openspec/changes/dynamic-execution-mode/proposal.md @@ -0,0 +1,36 @@ +## Why + +任务创建时硬编码 `execution_mode = SshCli`(forgejo.rs:234),导致所有从 webhook 创建的任务都走 ssh_cli 路径。 + +实际场景中,执行模式应该在 dispatch 时动态决定: +- Hermes 是 http_pull agent(有自己的调度器),无法被 SSH 调度 +- Claude Code 在 WSL2 上可以被 SSH 调度,但 arm0 到 WSL2 的连通性可能变化 +- 未来可能一个任务同时有 ssh_cli 和 http_pull 的 agent 都能做 + +当前硬编码导致的问题: +- Hermes dequeue 拿不到任务(因为任务被标记为 ssh_cli,dequeue 只查 http_pull) +- 需要手动改 DB 才能让 http_pull agent 接任务 +- coordinator 无法显式指派任务给特定 agent + +## What Changes + +- `ExecutionMode` enum 新增 `Undecided` 变体(任务创建时的默认值) +- Webhook 创建任务时不再硬编码 `SshCli` +- Dispatch loop 动态决定执行模式: + - 有匹配的 ssh_cli host 且 agent online → ssh_cli(立即执行) + - 没有匹配的 ssh_cli host → 标记为 http_pull(等待 agent dequeue) +- 新增 API:coordinator 可以显式指派任务给特定 agent(指定 agent_id) +- dequeue API 查询条件更新:`execution_mode IN ('http_pull', 'undecided')` + +## Capabilities + +### Modified Capabilities +- `task-lifecycle`: 任务状态机增加 undecided → ssh_cli/http_pull 的自动转换 +- `agent-registry`: dispatch 逻辑改为两阶段匹配 + +## Impact + +- **数据模型**:`ExecutionMode` enum 新增 `Undecided` +- **API**:新增指派端点,dequeue 查询条件变更 +- **dispatch loop**:核心调度逻辑重写 +- **向后兼容**:已有的 `ssh_cli`/`http_pull` 任务不受影响 diff --git a/openspec/changes/dynamic-execution-mode/specs/task-lifecycle/spec.md b/openspec/changes/dynamic-execution-mode/specs/task-lifecycle/spec.md new file mode 100644 index 0000000..a0a1ae6 --- /dev/null +++ b/openspec/changes/dynamic-execution-mode/specs/task-lifecycle/spec.md @@ -0,0 +1,82 @@ +## ADDED Requirements + +### Requirement: ExecutionMode enum includes Undecided variant +`ExecutionMode` enum SHALL include an `Undecided` variant as the default for newly created tasks. + +#### Scenario: Task created via Forgejo webhook +- **WHEN** a Forgejo Issue webhook creates a task +- **THEN** `execution_mode` SHALL be `Undecided` +- **AND** the task SHALL be eligible for both ssh_cli dispatch and http_pull dequeue + +#### Scenario: Task created via API +- **WHEN** a task is created via direct API call without specifying execution_mode +- **THEN** `execution_mode` SHALL default to `Undecided` + +### Requirement: Two-phase dispatch loop +The dispatch loop SHALL use a two-phase approach to handle `Undecided` tasks. + +#### Scenario: Undecided task with matching ssh_cli host +- **GIVEN** an `Undecided` task with labels `["agent:code"]` +- **AND** a registered ssh_cli host with agent capabilities matching `["agent:code"]` +- **WHEN** the dispatch loop runs +- **THEN** the task SHALL be assigned `execution_mode = SshCli` +- **AND** the task SHALL be dispatched via SSH for execution + +#### Scenario: Undecided task with no matching ssh_cli host +- **GIVEN** an `Undecided` task with labels `["agent:review", "agent:document"]` +- **AND** no registered ssh_cli host with matching capabilities +- **WHEN** the dispatch loop runs +- **THEN** the task SHALL be assigned `execution_mode = HttpPull` +- **AND** the task SHALL become available for http_pull dequeue + +#### Scenario: Undecided task with matching ssh_cli host but agent offline +- **GIVEN** an `Undecided` task with matching ssh_cli host +- **AND** the ssh_cli host is unreachable or agent is offline +- **WHEN** the dispatch loop runs +- **THEN** the task SHALL remain `Undecided` (retry next cycle) +- **AND** the task SHALL also be available for http_pull dequeue (fallback) + +### Requirement: Coordinator explicit assignment +The API SHALL provide an endpoint for coordinators to explicitly assign tasks to specific agents. + +#### Scenario: Coordinator assigns task to specific agent +- **GIVEN** a task in `Created` or `Undecided` status +- **WHEN** coordinator calls `POST /api/v1/tasks/{id}/assign` with `{"agent_id": "hermes-worker-01"}` +- **THEN** the task SHALL be assigned to the specified agent +- **AND** execution_mode SHALL be auto-detected from the agent's registration type (http_pull for registered agents, ssh_cli for configured hosts) +- **AND** the task status SHALL transition to `Assigned` + +#### Scenario: Coordinator assigns to non-existent agent +- **WHEN** coordinator calls assign with an unknown agent_id +- **THEN** the API SHALL return 404 Not Found + +#### Scenario: Coordinator assigns already-running task +- **WHEN** coordinator calls assign on a task in `Running` or `Completed` status +- **THEN** the API SHALL return 400 Bad Request + +### Requirement: Dequeue accepts Undecided tasks +The dequeue endpoint SHALL return tasks with `execution_mode` of either `HttpPull` or `Undecided`. + +#### Scenario: Agent dequeues Undecided task +- **GIVEN** an `Undecided` task matching the agent's capabilities +- **WHEN** an http_pull agent calls dequeue +- **THEN** the task SHALL be returned +- **AND** `execution_mode` SHALL be atomically updated to `HttpPull` +- **AND** the task SHALL be assigned to the dequeuing agent + +#### Scenario: No race condition between dispatch and dequeue +- **GIVEN** an `Undecided` task +- **WHEN** both ssh_cli dispatch and http_pull dequeue attempt to claim it simultaneously +- **THEN** exactly one SHALL succeed (atomic claim via DB transaction) +- **AND** the other SHALL get no task / skip the task + +### Requirement: Backward compatibility +Existing tasks with `execution_mode = SshCli` or `HttpPull` SHALL continue to work without changes. + +#### Scenario: Pre-existing SshCli task +- **WHEN** a task already has `execution_mode = SshCli` +- **THEN** the dispatch loop SHALL process it as before (no change) + +#### Scenario: Pre-existing HttpPull task +- **WHEN** a task already has `execution_mode = HttpPull` +- **THEN** the dequeue endpoint SHALL return it as before (no change) diff --git a/openspec/changes/dynamic-execution-mode/tasks.md b/openspec/changes/dynamic-execution-mode/tasks.md new file mode 100644 index 0000000..607c1ae --- /dev/null +++ b/openspec/changes/dynamic-execution-mode/tasks.md @@ -0,0 +1,52 @@ +## 1. 数据模型 + +- [ ] 1.1 `ExecutionMode` enum 新增 `Undecided` 变体 +- [ ] 1.2 Task 默认 execution_mode 改为 `Undecided` +- [ ] 1.3 DB schema 更新(如需要) +- [ ] 1.4 单元测试:Undecided 序列化/反序列化 + +## 2. Forgejo Webhook + +- [ ] 2.1 移除 `forgejo.rs` 中硬编码的 `ExecutionMode::SshCli` +- [ ] 2.2 改为 `ExecutionMode::Undecided` +- [ ] 2.3 测试:webhook 创建的任务 execution_mode 为 Undecided + +## 3. Dispatch Loop 重写 + +- [ ] 3.1 Phase 1:扫描 Undecided 任务,尝试匹配 ssh_cli host + - 匹配成功 → 标记 SshCli + 执行 + - 匹配失败或 host offline → 保持 Undecided +- [ ] 3.2 Phase 2:超时未匹配的 Undecided 任务标记为 HttpPull + - 超时阈值可配置(默认 30s,即 3 个 dispatch cycle) + - 或者:直接让 dequeue 也能拉 Undecided(更简单) +- [ ] 3.3 单元测试:两个阶段的各种场景 +- [ ] 3.4 集成测试:混合 ssh_cli + http_pull 环境 + +## 4. Dequeue API 更新 + +- [ ] 4.1 SQL 查询改为 `execution_mode IN ('http_pull', 'undecided')` +- [ ] 4.2 Dequeue 时原子更新 execution_mode 为 HttpPull(如果原为 Undecided) +- [ ] 4.3 测试:dequeue Undecided 任务返回 200 + 正确赋值 + +## 5. Coordinator 指派 API + +- [ ] 5.1 新增 `POST /api/v1/tasks/{id}/assign` + - 请求体:`{"agent_id": "...", "execution_mode": "..."(可选)}` + - 自动检测:注册的 http_pull agent → HttpPull,配置的 ssh_cli host → SshCli + - 错误处理:404(agent 不存在)、400(任务状态不允许) +- [ ] 5.2 路由注册 +- [ ] 5.3 测试:指派成功、指派失败的各种场景 + +## 6. 文档更新 + +- [ ] 6.1 API 参考新增 assign 端点 +- [ ] 6.2 Skill 更新:dequeue 现在也能拿到 Undecided 任务 +- [ ] 6.3 架构文档更新:两阶段 dispatch 说明 + +## 7. 验证 + +- [ ] 7.1 端到端测试:webhook 创建任务 → dispatch → ssh_cli 执行 +- [ ] 7.2 端到端测试:webhook 创建任务 → 无 ssh_cli 匹配 → http_pull dequeue +- [ ] 7.3 端到端测试:coordinator 指派 → agent 执行 +- [ ] 7.4 竞争条件测试:dispatch 和 dequeue 同时抢任务 +- [ ] 7.5 向后兼容:已有 SshCli/HttpPull 任务不受影响 diff --git a/skill/SKILL.md b/skill/SKILL.md index a3fe842..af12df3 100644 --- a/skill/SKILL.md +++ b/skill/SKILL.md @@ -123,7 +123,7 @@ Register → Save credentials to memory → Start heartbeat loop (every 60s, run - Maximum allowed gap: `heartbeat_interval_secs × heartbeat_timeout_threshold` (default: 180 seconds) - If you exceed that gap, you will be marked offline and your tasks will be requeued - This must run for the entire lifetime of the agent, not just once -4. **Dequeue** when ready for work via `POST /api/v1/tasks/dequeue`. Returns a Task or 204 No Content. +4. **Dequeue** when ready for work via `POST /api/v1/tasks/dequeue`. Returns a Task or 204 No Content. The returned task may already be `http_pull` or may have been `undecided` and atomically claimed as `http_pull` during dequeue. 5. **Update status** to `running` via `POST /api/v1/tasks/{task_id}/status`. 6. **Complete** the task via `POST /api/v1/tasks/{task_id}/complete` with a Receipt. 7. **Deregister** when shutting down via `POST /api/v1/agents/deregister`. @@ -179,7 +179,7 @@ curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/dequeue \ -d '{"agent_id": "worker-03", "capabilities": ["code:rust"]}' ``` -Returns 200 with Task JSON, or 204 if no matching task. +Returns 200 with Task JSON, or 204 if no matching task. Dequeue may return tasks that were previously `undecided`; if so, the Orchestrator updates them to `http_pull` as part of the same claim. ### Get Task Detail diff --git a/src/api.rs b/src/api.rs index b480712..160d56c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -147,6 +147,12 @@ pub struct UpdateTaskStatusRequest { pub status: String, } +#[derive(Debug, Deserialize, Serialize)] +pub struct AssignTaskRequest { + pub agent_id: String, + pub execution_mode: Option, +} + #[derive(Debug, Serialize)] pub struct ReceiptResponse { pub task_id: String, @@ -319,6 +325,67 @@ pub async fn dequeue_task( } } +pub async fn assign_task( + State(state): State, + Path(task_id): Path, + Json(req): Json, +) -> Result, ApiError> { + let store = state.store.clone(); + let config = state.config.clone(); + tokio::task::spawn_blocking(move || -> Result, ApiError> { + let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; + let task = store.read_task(&task_id)?.ok_or_else(|| ApiError::NotFound(format!("task {task_id}")))?; + if !matches!(task.status, TaskStatus::Created) { + return Err(ApiError::BadRequest(format!("task {} is not assignable from status {}", task.task_id, task.status.as_str()))); + } + + let (execution_mode, assigned_host) = if let Some(mode) = req.execution_mode.as_deref() { + let mode = ExecutionMode::from_str(mode); + let assigned_host = if mode == ExecutionMode::SshCli { + config.hosts.iter().find(|h| h.host_id == req.agent_id).map(|h| h.host_id.clone()) + } else { + None + }; + (mode, assigned_host) + } else if store.find_agent_by_id(&req.agent_id)?.is_some() { + (ExecutionMode::HttpPull, None) + } else if let Some(host) = config.hosts.iter().find(|h| h.host_id == req.agent_id) { + (ExecutionMode::SshCli, Some(host.host_id.clone())) + } else { + return Err(ApiError::NotFound(format!("agent {}", req.agent_id))); + }; + + let now = Utc::now(); + let event = crate::core::models::TaskEvent { + event_id: uuid::Uuid::new_v4().to_string(), + task_id: task_id.clone(), + event_type: "task.assigned".into(), + agent_id: Some(req.agent_id.clone()), + timestamp: now, + payload: serde_json::json!({ + "from_status": task.status.as_str(), + "to_status": "assigned", + "reason": "coordinator assign", + "execution_mode": execution_mode.as_str(), + "assigned_host": assigned_host, + }), + }; + + let updated = store + .assign_task( + &task_id, + &req.agent_id, + execution_mode, + assigned_host.as_deref(), + &now.to_rfc3339(), + &event, + )? + .ok_or_else(|| ApiError::NotFound(format!("task {task_id}")))?; + Ok(Json(updated)) + }) + .await? +} + pub async fn update_task_status( State(state): State, headers: HeaderMap, @@ -590,6 +657,7 @@ mod tests { .route("/api/v1/tasks", axum::routing::get(list_tasks)) .route("/api/v1/tasks/{task_id}", axum::routing::get(get_task)) .route("/api/v1/tasks/{task_id}/retry", axum::routing::post(retry_task)) + .route("/api/v1/tasks/{task_id}/assign", axum::routing::post(assign_task)) .route("/api/v1/tasks/dequeue", axum::routing::post(dequeue_task)) .route("/api/v1/tasks/{task_id}/status", axum::routing::post(update_task_status)) .with_state(state) @@ -718,6 +786,131 @@ mod tests { } #[tokio::test] + async fn assign_task_succeeds_for_registered_http_pull_agent() { + let (_dir, state) = test_state(); + { + let mut store = state.store.lock().unwrap(); + store.insert_task(&sample_task("t-assign", TaskStatus::Created, ExecutionMode::Undecided)).unwrap(); + store.upsert_agent(&Agent { + agent_id: "worker-http".into(), + agent_type: AgentType::OpenClaw, + hostname: "host".into(), + capabilities: vec!["code:rust".into()], + max_concurrency: 1, + current_tasks: 0, + status: AgentStatus::Online, + last_heartbeat_at: Utc::now(), + registered_at: Utc::now(), + metadata: HashMap::new(), + }).unwrap(); + } + let app = app(state); + let body = serde_json::to_string(&AssignTaskRequest { agent_id: "worker-http".into(), execution_mode: None }).unwrap(); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/tasks/t-assign/assign") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap(); + let task: Task = serde_json::from_slice(&body).unwrap(); + assert_eq!(task.execution_mode, ExecutionMode::HttpPull); + assert_eq!(task.status, TaskStatus::Assigned); + } + + #[tokio::test] + async fn assign_task_fails_for_unknown_agent() { + let (_dir, state) = test_state(); + { + let store = state.store.lock().unwrap(); + store.insert_task(&sample_task("t-assign-missing", TaskStatus::Created, ExecutionMode::Undecided)).unwrap(); + } + let app = app(state); + let body = serde_json::to_string(&AssignTaskRequest { agent_id: "missing-agent".into(), execution_mode: None }).unwrap(); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/tasks/t-assign-missing/assign") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn assign_task_rejects_invalid_status() { + let (_dir, state) = test_state(); + { + let mut store = state.store.lock().unwrap(); + store.insert_task(&sample_task("t-running", TaskStatus::Running, ExecutionMode::Undecided)).unwrap(); + store.upsert_agent(&Agent { + agent_id: "worker-http".into(), + agent_type: AgentType::OpenClaw, + hostname: "host".into(), + capabilities: vec!["code:rust".into()], + max_concurrency: 1, + current_tasks: 0, + status: AgentStatus::Online, + last_heartbeat_at: Utc::now(), + registered_at: Utc::now(), + metadata: HashMap::new(), + }).unwrap(); + } + let app = app(state); + let body = serde_json::to_string(&AssignTaskRequest { agent_id: "worker-http".into(), execution_mode: None }).unwrap(); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/tasks/t-running/assign") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn dequeue_claims_undecided_task_as_http_pull() { + let (_dir, state) = test_state(); + { + let store = state.store.lock().unwrap(); + let mut task = sample_task("t-undecided", TaskStatus::Created, ExecutionMode::Undecided); + task.labels = vec!["code:rust".into()]; + store.insert_task(&task).unwrap(); + } + let app = app(state); + let body = serde_json::json!({"agent_id":"worker-http","capabilities":["code:rust"]}).to_string(); + let resp = app + .oneshot( + Request::builder() + .method("POST") + .uri("/api/v1/tasks/dequeue") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap(); + let task: Task = serde_json::from_slice(&body).unwrap(); + assert_eq!(task.execution_mode, ExecutionMode::HttpPull); + assert_eq!(task.status, TaskStatus::Assigned); + } + async fn update_status_rejects_ssh_cli_task() { let (_dir, state) = test_state(); { diff --git a/src/core/event_store.rs b/src/core/event_store.rs index 565a2fb..c65f8a6 100644 --- a/src/core/event_store.rs +++ b/src/core/event_store.rs @@ -54,7 +54,7 @@ impl EventStore { task_type TEXT NOT NULL, priority TEXT NOT NULL DEFAULT 'normal', status TEXT NOT NULL DEFAULT 'created', - execution_mode TEXT NOT NULL DEFAULT 'ssh_cli', + execution_mode TEXT NOT NULL DEFAULT 'undecided', assigned_agent_id TEXT, assigned_host TEXT, requirements TEXT NOT NULL DEFAULT '', @@ -79,7 +79,7 @@ impl EventStore { let _ = self .conn - .execute("ALTER TABLE tasks ADD COLUMN execution_mode TEXT NOT NULL DEFAULT 'ssh_cli'", []); + .execute("ALTER TABLE tasks ADD COLUMN execution_mode TEXT NOT NULL DEFAULT 'undecided'", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN assigned_host TEXT", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN branch_name TEXT", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN pr_title TEXT", []); @@ -335,6 +335,14 @@ impl EventStore { Ok(()) } + pub fn update_task_execution_mode(&mut self, task_id: &str, execution_mode: &str) -> SqlResult<()> { + self.conn.execute( + "UPDATE tasks SET execution_mode = ?1 WHERE task_id = ?2", + params![execution_mode, task_id], + )?; + Ok(()) + } + pub fn dequeue_and_assign_http_pull( &mut self, required_capabilities: &[String], @@ -350,7 +358,7 @@ impl EventStore { started_at, completed_at, last_activity_at, retry_count, max_retries, review_count, timeout_seconds FROM tasks - WHERE status = 'created' AND execution_mode = 'http_pull' + WHERE status = 'created' AND execution_mode IN ('http_pull', 'undecided') ORDER BY CASE priority WHEN 'urgent' THEN 0 WHEN 'high' THEN 1 @@ -371,7 +379,12 @@ impl EventStore { }; tx.execute( - "UPDATE tasks SET status = 'assigned', assigned_agent_id = ?1, assigned_at = ?2 WHERE task_id = ?3", + "UPDATE tasks + SET status = 'assigned', + execution_mode = CASE WHEN execution_mode = 'undecided' THEN 'http_pull' ELSE execution_mode END, + assigned_agent_id = ?1, + assigned_at = ?2 + WHERE task_id = ?3", params![agent_id, now, task.task_id], )?; let mut event = event.clone(); @@ -478,6 +491,66 @@ impl EventStore { Ok(Some((original, updated))) } + pub fn assign_task( + &mut self, + task_id: &str, + agent_id: &str, + execution_mode: ExecutionMode, + assigned_host: Option<&str>, + now: &str, + event: &TaskEvent, + ) -> SqlResult> { + let tx = self.conn.transaction()?; + let original = { + let mut stmt = tx.prepare( + "SELECT task_id, source, task_type, priority, status, execution_mode, assigned_agent_id, + assigned_host, requirements, labels, branch_name, pr_title, created_at, assigned_at, + started_at, completed_at, last_activity_at, retry_count, max_retries, review_count, + timeout_seconds + FROM tasks WHERE task_id = ?1", + )?; + let result = match stmt.query_row(params![task_id], Self::row_to_task) { + Ok(task) => Ok(Some(task)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e), + }; + drop(stmt); + result? + }; + + let Some(_original) = original else { + tx.commit()?; + return Ok(None); + }; + + tx.execute( + "UPDATE tasks + SET status = 'assigned', + execution_mode = ?1, + assigned_agent_id = ?2, + assigned_host = ?3, + assigned_at = ?4 + WHERE task_id = ?5", + params![execution_mode.as_str(), agent_id, assigned_host, now, task_id], + )?; + Self::append_event(&tx, event)?; + + let updated = { + let mut stmt = tx.prepare( + "SELECT task_id, source, task_type, priority, status, execution_mode, assigned_agent_id, + assigned_host, requirements, labels, branch_name, pr_title, created_at, assigned_at, + started_at, completed_at, last_activity_at, retry_count, max_retries, review_count, + timeout_seconds + FROM tasks WHERE task_id = ?1", + )?; + let result = stmt.query_row(params![task_id], Self::row_to_task)?; + drop(stmt); + result + }; + tx.commit()?; + Ok(Some(updated)) + } + pub fn append_event_direct(&self, event: &TaskEvent) -> SqlResult<()> { Self::append_event(&self.conn, event) } diff --git a/src/core/models.rs b/src/core/models.rs index 5c32e73..a45a656 100644 --- a/src/core/models.rs +++ b/src/core/models.rs @@ -88,6 +88,7 @@ pub struct Agent { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ExecutionMode { + Undecided, SshCli, HttpPull, } @@ -95,6 +96,7 @@ pub enum ExecutionMode { impl ExecutionMode { pub fn as_str(&self) -> &'static str { match self { + Self::Undecided => "undecided", Self::SshCli => "ssh_cli", Self::HttpPull => "http_pull", } @@ -102,6 +104,7 @@ impl ExecutionMode { pub fn from_str(value: &str) -> Self { match value { + "undecided" => Self::Undecided, "http_pull" => Self::HttpPull, _ => Self::SshCli, } diff --git a/src/dispatch.rs b/src/dispatch.rs index b84cdd2..e38b4a6 100644 --- a/src/dispatch.rs +++ b/src/dispatch.rs @@ -35,9 +35,15 @@ impl Dispatcher { store.list_tasks(Some("created"), None).map_err(|e| e.to_string())? }; - for task in tasks.into_iter().filter(|t| t.execution_mode == ExecutionMode::SshCli) { + let mut undecided_without_host = Vec::new(); + for task in tasks.into_iter().filter(|t| matches!(t.execution_mode, ExecutionMode::Undecided | ExecutionMode::SshCli)) { if let Some((host, agent_type)) = self.select_host(&task).await? { let agent_id = format!("{}:{}", host.host_id, agent_type); + + if task.execution_mode == ExecutionMode::Undecided { + self.set_execution_mode(&task.task_id, ExecutionMode::SshCli).await?; + } + let assigned = self .sm .transition_with_host(&task.task_id, TaskStatus::Assigned, Some(&agent_id), Some(&host.host_id), "ssh dispatch") @@ -70,9 +76,15 @@ impl Dispatcher { .await; } } + } else if task.execution_mode == ExecutionMode::Undecided { + undecided_without_host.push(task.task_id); } } + for task_id in undecided_without_host { + self.set_execution_mode(&task_id, ExecutionMode::HttpPull).await?; + } + let review_tasks = { let store = self.store.lock().map_err(|e| e.to_string())?; store.list_tasks(Some("review_pending"), None).map_err(|e| e.to_string())? @@ -111,6 +123,17 @@ impl Dispatcher { Ok(candidates.into_iter().next().map(|(h, a, _)| (h, a))) } + async fn set_execution_mode(&self, task_id: &str, mode: ExecutionMode) -> Result<(), String> { + let store = self.store.clone(); + let task_id = task_id.to_string(); + tokio::task::spawn_blocking(move || -> Result<(), String> { + let mut store = store.lock().map_err(|e| e.to_string())?; + store.update_task_execution_mode(&task_id, mode.as_str()).map_err(|e| e.to_string()) + }) + .await + .map_err(|e| e.to_string())? + } + fn current_host_loads(&self) -> Result, String> { let store = self.store.lock().map_err(|e| e.to_string())?; let tasks = store.list_tasks(None, None).map_err(|e| e.to_string())?; @@ -191,12 +214,12 @@ mod tests { HostConfig { host_id: "h2".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22, ssh_key_path: None, work_dir: "/tmp".into(), - agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 2, capabilities: vec!["code:rust".into()] }], + agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 2, capabilities: vec!["code:rust".into(), "agent:code".into()] }], }, HostConfig { host_id: "h1".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22, ssh_key_path: None, work_dir: "/tmp".into(), - agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 1, capabilities: vec!["code:rust".into()] }], + agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 1, capabilities: vec!["code:rust".into(), "agent:code".into()] }], }, ], } @@ -215,6 +238,65 @@ mod tests { assert_eq!(selected.0.host_id, "h2"); } + #[tokio::test] + async fn dispatches_undecided_task_to_ssh_cli_when_host_available() { + let dir = TempDir::new().unwrap(); + let db = dir.path().join("test.db"); + let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap())); + let sm = Arc::new(StateMachine::new(store.clone())); + let mut task = sample_task(); + task.execution_mode = ExecutionMode::Undecided; + task.labels = vec!["agent:code".into()]; + sm.create_task(&task).await.unwrap(); + + let dispatcher = Dispatcher::new(config(), store.clone(), sm); + let selected = dispatcher.select_host(&task).await.unwrap(); + assert!(selected.is_some()); + + dispatcher.set_execution_mode(&task.task_id, ExecutionMode::SshCli).await.unwrap(); + + let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap(); + assert_eq!(updated.execution_mode, ExecutionMode::SshCli); + assert_eq!(updated.status, TaskStatus::Created); + } + + #[tokio::test] + async fn converts_unmatched_undecided_task_to_http_pull() { + let dir = TempDir::new().unwrap(); + let db = dir.path().join("test.db"); + let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap())); + let sm = Arc::new(StateMachine::new(store.clone())); + let mut task = sample_task(); + task.execution_mode = ExecutionMode::Undecided; + task.labels = vec!["agent:document".into()]; + sm.create_task(&task).await.unwrap(); + + let dispatcher = Dispatcher::new(config(), store.clone(), sm); + dispatcher.dispatch_once().await.unwrap(); + + let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap(); + assert_eq!(updated.execution_mode, ExecutionMode::HttpPull); + assert_eq!(updated.status, TaskStatus::Created); + } + + #[tokio::test] + async fn leaves_existing_http_pull_task_unchanged() { + let dir = TempDir::new().unwrap(); + let db = dir.path().join("test.db"); + let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap())); + let sm = Arc::new(StateMachine::new(store.clone())); + let mut task = sample_task(); + task.execution_mode = ExecutionMode::HttpPull; + sm.create_task(&task).await.unwrap(); + + let dispatcher = Dispatcher::new(config(), store.clone(), sm); + dispatcher.dispatch_once().await.unwrap(); + + let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap(); + assert_eq!(updated.execution_mode, ExecutionMode::HttpPull); + assert_eq!(updated.status, TaskStatus::Created); + } + #[tokio::test] async fn does_not_match_agent_label_without_capability() { let dir = TempDir::new().unwrap(); diff --git a/src/integrations/forgejo.rs b/src/integrations/forgejo.rs index c84d67c..0c713e2 100644 --- a/src/integrations/forgejo.rs +++ b/src/integrations/forgejo.rs @@ -231,7 +231,7 @@ pub fn issue_event_to_task(event: &ForgejoIssueEvent, default_max_retries: u32, task_type, priority, status: TaskStatus::Created, - execution_mode: ExecutionMode::SshCli, + execution_mode: ExecutionMode::Undecided, assigned_agent_id: None, assigned_host: None, requirements: format!("{}\n\n{}", event.issue.title, event.issue.body.clone().unwrap_or_default()).trim().to_string(), @@ -381,7 +381,7 @@ mod tests { assert_eq!(task.task_type, "code"); assert_eq!(task.priority, Priority::High); assert_eq!(task.status, TaskStatus::Created); - assert_eq!(task.execution_mode, ExecutionMode::SshCli); + assert_eq!(task.execution_mode, ExecutionMode::Undecided); assert!(task.branch_name.is_some()); assert!(task.pr_title.is_some()); } diff --git a/src/main.rs b/src/main.rs index b5c048e..9e8ec6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,6 +78,7 @@ async fn main() { .route("/api/v1/tasks", axum::routing::get(api::list_tasks)) .route("/api/v1/tasks/dequeue", axum::routing::post(api::dequeue_task)) .route("/api/v1/tasks/{task_id}", axum::routing::get(api::get_task)) + .route("/api/v1/tasks/{task_id}/assign", axum::routing::post(api::assign_task)) .route("/api/v1/tasks/{task_id}/status", axum::routing::post(api::update_task_status)) .route("/api/v1/tasks/{task_id}/complete", axum::routing::post(api::complete_task)) .route("/api/v1/tasks/{task_id}/retry", axum::routing::post(api::retry_task))