feat: dynamic execution mode — Undecided tasks, two-phase dispatch, assign API

- ExecutionMode enum adds Undecided variant (default for new tasks)
- Webhook creates tasks as Undecided instead of hardcoded SshCli
- Dispatch loop: Phase 1 matches ssh_cli hosts, Phase 2 marks remaining as HttpPull
- Dequeue now returns http_pull AND undecided tasks (atomic claim)
- New endpoint: POST /api/v1/tasks/{id}/assign for coordinator explicit assignment
- Backward compatible: existing SshCli/HttpPull tasks unaffected
- 37 tests passing (6 new)
This commit is contained in:
Zer4tul 2026-05-13 05:29:12 +08:00
parent a18cb2824e
commit 48c93e2ce9
13 changed files with 639 additions and 13 deletions

View file

@ -236,7 +236,7 @@ curl http://FLEET_API_URL:PORT/api/v1/tasks/org%2Frepo%2342
POST /api/v1/tasks/dequeue POST /api/v1/tasks/dequeue
``` ```
Requires Bearer token if `http_pull_token` is configured. Only returns tasks with `execution_mode = http_pull`. Requires Bearer token if `http_pull_token` is configured. Returns tasks with `execution_mode = http_pull` and may also claim `undecided` tasks, atomically converting them to `http_pull` when dequeued.
**Request:** **Request:**
@ -258,6 +258,33 @@ curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/dequeue \
--- ---
### Assign Task
```
POST /api/v1/tasks/{task_id}/assign
```
Explicitly assign a created task to a specific agent or host.
**Request:**
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| agent_id | string | yes | Registered http_pull agent ID or configured ssh_cli host ID |
| execution_mode | string | no | Optional override: `ssh_cli` or `http_pull`. If omitted, the server auto-detects it from the target. |
**Response:** `200 OK` — updated [Task](#task-object).
**Errors:** `404` if task or agent not found. `400` if task status is not assignable.
```bash
curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/org%2Frepo%2342/assign \
-H 'Content-Type: application/json' \
-d '{"agent_id": "worker-03"}'
```
---
### Update Task Status (http_pull only) ### Update Task Status (http_pull only)
``` ```
@ -455,7 +482,7 @@ Receives Forgejo webhook events. Requires HMAC-SHA256 signature header.
**Priority values:** `low`, `normal`, `high`, `urgent` **Priority values:** `low`, `normal`, `high`, `urgent`
**Execution mode values:** `ssh_cli`, `http_pull` **Execution mode values:** `undecided`, `ssh_cli`, `http_pull`
### Receipt Object ### Receipt Object

View file

@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-12

View file

@ -0,0 +1,75 @@
## Context
当前任务创建时硬编码 `execution_mode = SshCli`。这在只有 ssh_cli agent 时可行但在混合环境ssh_cli + http_pull中会阻塞 http_pull agent 接任务。
核心洞察:**执行模式是调度决策,不是任务属性**。任务只描述"需要什么能力",由调度器决定"怎么执行"。
## Goals / Non-Goals
**Goals:**
- 任务创建时不预设执行模式
- Dispatch loop 根据注册的 agent 动态决定
- Coordinator 可以显式指派任务给特定 agent
- http_pull agent 能 dequeue 到未被 ssh_cli 认领的任务
**Non-Goals:**
- 不实现智能调度负载均衡、亲和性等——Phase 2 再考虑
- 不改变 receipt 验证流程
- 不改变 Forgejo webhook 格式
## Decisions
### Decision 1: ExecutionMode 新增 Undecided
**选择**: 新增 `Undecided` 变体作为默认值
**理由**:
- 向后兼容:已有的 `SshCli`/`HttpPull` 任务不受影响
- 语义清晰:`Undecided` 表示"等待调度器决定"
- dispatch loop 只处理 `Undecided` 任务,已决定的不再改变
**替代方案**:
- 用 `Option<ExecutionMode>`None 表示未决定)—— 语义等价但 enum 更明确
- 去掉 execution_mode 字段,纯靠 runtime 状态—— 太激进,改太大
### Decision 2: 两阶段 dispatch
**选择**: dispatch loop 分两阶段:
1. **ssh_cli 阶段**:扫描 `Undecided` 任务,查找匹配的 ssh_cli host → 找到则标记 `SshCli` 并执行
2. **http_pull 阶段**:剩余的 `Undecided` 任务标记为 `HttpPull`,等待 agent dequeue
**理由**:
- ssh_cli 是主动调度orchestrator 控制),优先级高于被动等待
- http_pull agent 通过 dequeue 自行认领,不需要 orchestrator 主动分配
- 两阶段简单清晰,不需要复杂的调度算法
### Decision 3: Coordinator 显式指派
**选择**: 新增 `POST /api/v1/tasks/{id}/assign` 端点
```json
{
"agent_id": "hermes-worker-01",
"execution_mode": "http_pull" // optional, auto-detect if omitted
}
```
**理由**:
- coordinatorJeeves可能比自动调度更了解哪个 agent 适合
- 支持跨任务指派(如"这个文档任务给 Hermes"
- 指派后任务不再是 `Undecided`,直接进入执行
### Decision 4: Dequeue 查询条件
**选择**: dequeue 查询改为 `execution_mode IN ('http_pull', 'undecided')`
**理由**:
- 纯 http_pull 任务直接匹配
- 被自动标记为 http_pull 的任务也能匹配
- 如果调度器还没来得及处理 `Undecided`agent 也能直接拉走(降级为 http_pull
## Risks / Trade-offs
- **[竞争条件]** ssh_cli dispatch 和 http_pull dequeue 可能同时抢同一个 `Undecided` 任务 → 用 SQLite 事务保证原子性dequeue 用 `UPDATE ... RETURNING` 原子操作
- **[调度延迟]** Undecided 任务可能等一个 dispatch cycle 才被标记为 http_pull → dequeue 直接查 Undecided 可以缓解

View file

@ -0,0 +1,36 @@
## Why
任务创建时硬编码 `execution_mode = SshCli`forgejo.rs:234导致所有从 webhook 创建的任务都走 ssh_cli 路径。
实际场景中,执行模式应该在 dispatch 时动态决定:
- Hermes 是 http_pull agent有自己的调度器无法被 SSH 调度
- Claude Code 在 WSL2 上可以被 SSH 调度,但 arm0 到 WSL2 的连通性可能变化
- 未来可能一个任务同时有 ssh_cli 和 http_pull 的 agent 都能做
当前硬编码导致的问题:
- Hermes dequeue 拿不到任务(因为任务被标记为 ssh_clidequeue 只查 http_pull
- 需要手动改 DB 才能让 http_pull agent 接任务
- coordinator 无法显式指派任务给特定 agent
## What Changes
- `ExecutionMode` enum 新增 `Undecided` 变体(任务创建时的默认值)
- Webhook 创建任务时不再硬编码 `SshCli`
- Dispatch loop 动态决定执行模式:
- 有匹配的 ssh_cli host 且 agent online → ssh_cli立即执行
- 没有匹配的 ssh_cli host → 标记为 http_pull等待 agent dequeue
- 新增 APIcoordinator 可以显式指派任务给特定 agent指定 agent_id
- dequeue API 查询条件更新:`execution_mode IN ('http_pull', 'undecided')`
## Capabilities
### Modified Capabilities
- `task-lifecycle`: 任务状态机增加 undecided → ssh_cli/http_pull 的自动转换
- `agent-registry`: dispatch 逻辑改为两阶段匹配
## Impact
- **数据模型**`ExecutionMode` enum 新增 `Undecided`
- **API**新增指派端点dequeue 查询条件变更
- **dispatch loop**:核心调度逻辑重写
- **向后兼容**:已有的 `ssh_cli`/`http_pull` 任务不受影响

View file

@ -0,0 +1,82 @@
## ADDED Requirements
### Requirement: ExecutionMode enum includes Undecided variant
`ExecutionMode` enum SHALL include an `Undecided` variant as the default for newly created tasks.
#### Scenario: Task created via Forgejo webhook
- **WHEN** a Forgejo Issue webhook creates a task
- **THEN** `execution_mode` SHALL be `Undecided`
- **AND** the task SHALL be eligible for both ssh_cli dispatch and http_pull dequeue
#### Scenario: Task created via API
- **WHEN** a task is created via direct API call without specifying execution_mode
- **THEN** `execution_mode` SHALL default to `Undecided`
### Requirement: Two-phase dispatch loop
The dispatch loop SHALL use a two-phase approach to handle `Undecided` tasks.
#### Scenario: Undecided task with matching ssh_cli host
- **GIVEN** an `Undecided` task with labels `["agent:code"]`
- **AND** a registered ssh_cli host with agent capabilities matching `["agent:code"]`
- **WHEN** the dispatch loop runs
- **THEN** the task SHALL be assigned `execution_mode = SshCli`
- **AND** the task SHALL be dispatched via SSH for execution
#### Scenario: Undecided task with no matching ssh_cli host
- **GIVEN** an `Undecided` task with labels `["agent:review", "agent:document"]`
- **AND** no registered ssh_cli host with matching capabilities
- **WHEN** the dispatch loop runs
- **THEN** the task SHALL be assigned `execution_mode = HttpPull`
- **AND** the task SHALL become available for http_pull dequeue
#### Scenario: Undecided task with matching ssh_cli host but agent offline
- **GIVEN** an `Undecided` task with matching ssh_cli host
- **AND** the ssh_cli host is unreachable or agent is offline
- **WHEN** the dispatch loop runs
- **THEN** the task SHALL remain `Undecided` (retry next cycle)
- **AND** the task SHALL also be available for http_pull dequeue (fallback)
### Requirement: Coordinator explicit assignment
The API SHALL provide an endpoint for coordinators to explicitly assign tasks to specific agents.
#### Scenario: Coordinator assigns task to specific agent
- **GIVEN** a task in `Created` or `Undecided` status
- **WHEN** coordinator calls `POST /api/v1/tasks/{id}/assign` with `{"agent_id": "hermes-worker-01"}`
- **THEN** the task SHALL be assigned to the specified agent
- **AND** execution_mode SHALL be auto-detected from the agent's registration type (http_pull for registered agents, ssh_cli for configured hosts)
- **AND** the task status SHALL transition to `Assigned`
#### Scenario: Coordinator assigns to non-existent agent
- **WHEN** coordinator calls assign with an unknown agent_id
- **THEN** the API SHALL return 404 Not Found
#### Scenario: Coordinator assigns already-running task
- **WHEN** coordinator calls assign on a task in `Running` or `Completed` status
- **THEN** the API SHALL return 400 Bad Request
### Requirement: Dequeue accepts Undecided tasks
The dequeue endpoint SHALL return tasks with `execution_mode` of either `HttpPull` or `Undecided`.
#### Scenario: Agent dequeues Undecided task
- **GIVEN** an `Undecided` task matching the agent's capabilities
- **WHEN** an http_pull agent calls dequeue
- **THEN** the task SHALL be returned
- **AND** `execution_mode` SHALL be atomically updated to `HttpPull`
- **AND** the task SHALL be assigned to the dequeuing agent
#### Scenario: No race condition between dispatch and dequeue
- **GIVEN** an `Undecided` task
- **WHEN** both ssh_cli dispatch and http_pull dequeue attempt to claim it simultaneously
- **THEN** exactly one SHALL succeed (atomic claim via DB transaction)
- **AND** the other SHALL get no task / skip the task
### Requirement: Backward compatibility
Existing tasks with `execution_mode = SshCli` or `HttpPull` SHALL continue to work without changes.
#### Scenario: Pre-existing SshCli task
- **WHEN** a task already has `execution_mode = SshCli`
- **THEN** the dispatch loop SHALL process it as before (no change)
#### Scenario: Pre-existing HttpPull task
- **WHEN** a task already has `execution_mode = HttpPull`
- **THEN** the dequeue endpoint SHALL return it as before (no change)

View file

@ -0,0 +1,52 @@
## 1. 数据模型
- [ ] 1.1 `ExecutionMode` enum 新增 `Undecided` 变体
- [ ] 1.2 Task 默认 execution_mode 改为 `Undecided`
- [ ] 1.3 DB schema 更新(如需要)
- [ ] 1.4 单元测试Undecided 序列化/反序列化
## 2. Forgejo Webhook
- [ ] 2.1 移除 `forgejo.rs` 中硬编码的 `ExecutionMode::SshCli`
- [ ] 2.2 改为 `ExecutionMode::Undecided`
- [ ] 2.3 测试webhook 创建的任务 execution_mode 为 Undecided
## 3. Dispatch Loop 重写
- [ ] 3.1 Phase 1扫描 Undecided 任务,尝试匹配 ssh_cli host
- 匹配成功 → 标记 SshCli + 执行
- 匹配失败或 host offline → 保持 Undecided
- [ ] 3.2 Phase 2超时未匹配的 Undecided 任务标记为 HttpPull
- 超时阈值可配置(默认 30s即 3 个 dispatch cycle
- 或者:直接让 dequeue 也能拉 Undecided更简单
- [ ] 3.3 单元测试:两个阶段的各种场景
- [ ] 3.4 集成测试:混合 ssh_cli + http_pull 环境
## 4. Dequeue API 更新
- [ ] 4.1 SQL 查询改为 `execution_mode IN ('http_pull', 'undecided')`
- [ ] 4.2 Dequeue 时原子更新 execution_mode 为 HttpPull如果原为 Undecided
- [ ] 4.3 测试dequeue Undecided 任务返回 200 + 正确赋值
## 5. Coordinator 指派 API
- [ ] 5.1 新增 `POST /api/v1/tasks/{id}/assign`
- 请求体:`{"agent_id": "...", "execution_mode": "..."(可选)}`
- 自动检测:注册的 http_pull agent → HttpPull配置的 ssh_cli host → SshCli
- 错误处理404agent 不存在、400任务状态不允许
- [ ] 5.2 路由注册
- [ ] 5.3 测试:指派成功、指派失败的各种场景
## 6. 文档更新
- [ ] 6.1 API 参考新增 assign 端点
- [ ] 6.2 Skill 更新dequeue 现在也能拿到 Undecided 任务
- [ ] 6.3 架构文档更新:两阶段 dispatch 说明
## 7. 验证
- [ ] 7.1 端到端测试webhook 创建任务 → dispatch → ssh_cli 执行
- [ ] 7.2 端到端测试webhook 创建任务 → 无 ssh_cli 匹配 → http_pull dequeue
- [ ] 7.3 端到端测试coordinator 指派 → agent 执行
- [ ] 7.4 竞争条件测试dispatch 和 dequeue 同时抢任务
- [ ] 7.5 向后兼容:已有 SshCli/HttpPull 任务不受影响

View file

@ -123,7 +123,7 @@ Register → Save credentials to memory → Start heartbeat loop (every 60s, run
- Maximum allowed gap: `heartbeat_interval_secs × heartbeat_timeout_threshold` (default: 180 seconds) - Maximum allowed gap: `heartbeat_interval_secs × heartbeat_timeout_threshold` (default: 180 seconds)
- If you exceed that gap, you will be marked offline and your tasks will be requeued - If you exceed that gap, you will be marked offline and your tasks will be requeued
- This must run for the entire lifetime of the agent, not just once - This must run for the entire lifetime of the agent, not just once
4. **Dequeue** when ready for work via `POST /api/v1/tasks/dequeue`. Returns a Task or 204 No Content. 4. **Dequeue** when ready for work via `POST /api/v1/tasks/dequeue`. Returns a Task or 204 No Content. The returned task may already be `http_pull` or may have been `undecided` and atomically claimed as `http_pull` during dequeue.
5. **Update status** to `running` via `POST /api/v1/tasks/{task_id}/status`. 5. **Update status** to `running` via `POST /api/v1/tasks/{task_id}/status`.
6. **Complete** the task via `POST /api/v1/tasks/{task_id}/complete` with a Receipt. 6. **Complete** the task via `POST /api/v1/tasks/{task_id}/complete` with a Receipt.
7. **Deregister** when shutting down via `POST /api/v1/agents/deregister`. 7. **Deregister** when shutting down via `POST /api/v1/agents/deregister`.
@ -179,7 +179,7 @@ curl -X POST http://FLEET_API_URL:PORT/api/v1/tasks/dequeue \
-d '{"agent_id": "worker-03", "capabilities": ["code:rust"]}' -d '{"agent_id": "worker-03", "capabilities": ["code:rust"]}'
``` ```
Returns 200 with Task JSON, or 204 if no matching task. Returns 200 with Task JSON, or 204 if no matching task. Dequeue may return tasks that were previously `undecided`; if so, the Orchestrator updates them to `http_pull` as part of the same claim.
### Get Task Detail ### Get Task Detail

View file

@ -147,6 +147,12 @@ pub struct UpdateTaskStatusRequest {
pub status: String, pub status: String,
} }
#[derive(Debug, Deserialize, Serialize)]
pub struct AssignTaskRequest {
pub agent_id: String,
pub execution_mode: Option<String>,
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct ReceiptResponse { pub struct ReceiptResponse {
pub task_id: String, pub task_id: String,
@ -319,6 +325,67 @@ pub async fn dequeue_task(
} }
} }
pub async fn assign_task(
State(state): State<AppState>,
Path(task_id): Path<String>,
Json(req): Json<AssignTaskRequest>,
) -> Result<Json<Task>, ApiError> {
let store = state.store.clone();
let config = state.config.clone();
tokio::task::spawn_blocking(move || -> Result<Json<Task>, ApiError> {
let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?;
let task = store.read_task(&task_id)?.ok_or_else(|| ApiError::NotFound(format!("task {task_id}")))?;
if !matches!(task.status, TaskStatus::Created) {
return Err(ApiError::BadRequest(format!("task {} is not assignable from status {}", task.task_id, task.status.as_str())));
}
let (execution_mode, assigned_host) = if let Some(mode) = req.execution_mode.as_deref() {
let mode = ExecutionMode::from_str(mode);
let assigned_host = if mode == ExecutionMode::SshCli {
config.hosts.iter().find(|h| h.host_id == req.agent_id).map(|h| h.host_id.clone())
} else {
None
};
(mode, assigned_host)
} else if store.find_agent_by_id(&req.agent_id)?.is_some() {
(ExecutionMode::HttpPull, None)
} else if let Some(host) = config.hosts.iter().find(|h| h.host_id == req.agent_id) {
(ExecutionMode::SshCli, Some(host.host_id.clone()))
} else {
return Err(ApiError::NotFound(format!("agent {}", req.agent_id)));
};
let now = Utc::now();
let event = crate::core::models::TaskEvent {
event_id: uuid::Uuid::new_v4().to_string(),
task_id: task_id.clone(),
event_type: "task.assigned".into(),
agent_id: Some(req.agent_id.clone()),
timestamp: now,
payload: serde_json::json!({
"from_status": task.status.as_str(),
"to_status": "assigned",
"reason": "coordinator assign",
"execution_mode": execution_mode.as_str(),
"assigned_host": assigned_host,
}),
};
let updated = store
.assign_task(
&task_id,
&req.agent_id,
execution_mode,
assigned_host.as_deref(),
&now.to_rfc3339(),
&event,
)?
.ok_or_else(|| ApiError::NotFound(format!("task {task_id}")))?;
Ok(Json(updated))
})
.await?
}
pub async fn update_task_status( pub async fn update_task_status(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
@ -590,6 +657,7 @@ mod tests {
.route("/api/v1/tasks", axum::routing::get(list_tasks)) .route("/api/v1/tasks", axum::routing::get(list_tasks))
.route("/api/v1/tasks/{task_id}", axum::routing::get(get_task)) .route("/api/v1/tasks/{task_id}", axum::routing::get(get_task))
.route("/api/v1/tasks/{task_id}/retry", axum::routing::post(retry_task)) .route("/api/v1/tasks/{task_id}/retry", axum::routing::post(retry_task))
.route("/api/v1/tasks/{task_id}/assign", axum::routing::post(assign_task))
.route("/api/v1/tasks/dequeue", axum::routing::post(dequeue_task)) .route("/api/v1/tasks/dequeue", axum::routing::post(dequeue_task))
.route("/api/v1/tasks/{task_id}/status", axum::routing::post(update_task_status)) .route("/api/v1/tasks/{task_id}/status", axum::routing::post(update_task_status))
.with_state(state) .with_state(state)
@ -718,6 +786,131 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn assign_task_succeeds_for_registered_http_pull_agent() {
let (_dir, state) = test_state();
{
let mut store = state.store.lock().unwrap();
store.insert_task(&sample_task("t-assign", TaskStatus::Created, ExecutionMode::Undecided)).unwrap();
store.upsert_agent(&Agent {
agent_id: "worker-http".into(),
agent_type: AgentType::OpenClaw,
hostname: "host".into(),
capabilities: vec!["code:rust".into()],
max_concurrency: 1,
current_tasks: 0,
status: AgentStatus::Online,
last_heartbeat_at: Utc::now(),
registered_at: Utc::now(),
metadata: HashMap::new(),
}).unwrap();
}
let app = app(state);
let body = serde_json::to_string(&AssignTaskRequest { agent_id: "worker-http".into(), execution_mode: None }).unwrap();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/tasks/t-assign/assign")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
let task: Task = serde_json::from_slice(&body).unwrap();
assert_eq!(task.execution_mode, ExecutionMode::HttpPull);
assert_eq!(task.status, TaskStatus::Assigned);
}
#[tokio::test]
async fn assign_task_fails_for_unknown_agent() {
let (_dir, state) = test_state();
{
let store = state.store.lock().unwrap();
store.insert_task(&sample_task("t-assign-missing", TaskStatus::Created, ExecutionMode::Undecided)).unwrap();
}
let app = app(state);
let body = serde_json::to_string(&AssignTaskRequest { agent_id: "missing-agent".into(), execution_mode: None }).unwrap();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/tasks/t-assign-missing/assign")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn assign_task_rejects_invalid_status() {
let (_dir, state) = test_state();
{
let mut store = state.store.lock().unwrap();
store.insert_task(&sample_task("t-running", TaskStatus::Running, ExecutionMode::Undecided)).unwrap();
store.upsert_agent(&Agent {
agent_id: "worker-http".into(),
agent_type: AgentType::OpenClaw,
hostname: "host".into(),
capabilities: vec!["code:rust".into()],
max_concurrency: 1,
current_tasks: 0,
status: AgentStatus::Online,
last_heartbeat_at: Utc::now(),
registered_at: Utc::now(),
metadata: HashMap::new(),
}).unwrap();
}
let app = app(state);
let body = serde_json::to_string(&AssignTaskRequest { agent_id: "worker-http".into(), execution_mode: None }).unwrap();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/tasks/t-running/assign")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn dequeue_claims_undecided_task_as_http_pull() {
let (_dir, state) = test_state();
{
let store = state.store.lock().unwrap();
let mut task = sample_task("t-undecided", TaskStatus::Created, ExecutionMode::Undecided);
task.labels = vec!["code:rust".into()];
store.insert_task(&task).unwrap();
}
let app = app(state);
let body = serde_json::json!({"agent_id":"worker-http","capabilities":["code:rust"]}).to_string();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/tasks/dequeue")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
let task: Task = serde_json::from_slice(&body).unwrap();
assert_eq!(task.execution_mode, ExecutionMode::HttpPull);
assert_eq!(task.status, TaskStatus::Assigned);
}
async fn update_status_rejects_ssh_cli_task() { async fn update_status_rejects_ssh_cli_task() {
let (_dir, state) = test_state(); let (_dir, state) = test_state();
{ {

View file

@ -54,7 +54,7 @@ impl EventStore {
task_type TEXT NOT NULL, task_type TEXT NOT NULL,
priority TEXT NOT NULL DEFAULT 'normal', priority TEXT NOT NULL DEFAULT 'normal',
status TEXT NOT NULL DEFAULT 'created', status TEXT NOT NULL DEFAULT 'created',
execution_mode TEXT NOT NULL DEFAULT 'ssh_cli', execution_mode TEXT NOT NULL DEFAULT 'undecided',
assigned_agent_id TEXT, assigned_agent_id TEXT,
assigned_host TEXT, assigned_host TEXT,
requirements TEXT NOT NULL DEFAULT '', requirements TEXT NOT NULL DEFAULT '',
@ -79,7 +79,7 @@ impl EventStore {
let _ = self let _ = self
.conn .conn
.execute("ALTER TABLE tasks ADD COLUMN execution_mode TEXT NOT NULL DEFAULT 'ssh_cli'", []); .execute("ALTER TABLE tasks ADD COLUMN execution_mode TEXT NOT NULL DEFAULT 'undecided'", []);
let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN assigned_host TEXT", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN assigned_host TEXT", []);
let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN branch_name TEXT", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN branch_name TEXT", []);
let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN pr_title TEXT", []); let _ = self.conn.execute("ALTER TABLE tasks ADD COLUMN pr_title TEXT", []);
@ -335,6 +335,14 @@ impl EventStore {
Ok(()) Ok(())
} }
pub fn update_task_execution_mode(&mut self, task_id: &str, execution_mode: &str) -> SqlResult<()> {
self.conn.execute(
"UPDATE tasks SET execution_mode = ?1 WHERE task_id = ?2",
params![execution_mode, task_id],
)?;
Ok(())
}
pub fn dequeue_and_assign_http_pull( pub fn dequeue_and_assign_http_pull(
&mut self, &mut self,
required_capabilities: &[String], required_capabilities: &[String],
@ -350,7 +358,7 @@ impl EventStore {
started_at, completed_at, last_activity_at, retry_count, max_retries, review_count, started_at, completed_at, last_activity_at, retry_count, max_retries, review_count,
timeout_seconds timeout_seconds
FROM tasks FROM tasks
WHERE status = 'created' AND execution_mode = 'http_pull' WHERE status = 'created' AND execution_mode IN ('http_pull', 'undecided')
ORDER BY CASE priority ORDER BY CASE priority
WHEN 'urgent' THEN 0 WHEN 'urgent' THEN 0
WHEN 'high' THEN 1 WHEN 'high' THEN 1
@ -371,7 +379,12 @@ impl EventStore {
}; };
tx.execute( tx.execute(
"UPDATE tasks SET status = 'assigned', assigned_agent_id = ?1, assigned_at = ?2 WHERE task_id = ?3", "UPDATE tasks
SET status = 'assigned',
execution_mode = CASE WHEN execution_mode = 'undecided' THEN 'http_pull' ELSE execution_mode END,
assigned_agent_id = ?1,
assigned_at = ?2
WHERE task_id = ?3",
params![agent_id, now, task.task_id], params![agent_id, now, task.task_id],
)?; )?;
let mut event = event.clone(); let mut event = event.clone();
@ -478,6 +491,66 @@ impl EventStore {
Ok(Some((original, updated))) Ok(Some((original, updated)))
} }
pub fn assign_task(
&mut self,
task_id: &str,
agent_id: &str,
execution_mode: ExecutionMode,
assigned_host: Option<&str>,
now: &str,
event: &TaskEvent,
) -> SqlResult<Option<Task>> {
let tx = self.conn.transaction()?;
let original = {
let mut stmt = tx.prepare(
"SELECT task_id, source, task_type, priority, status, execution_mode, assigned_agent_id,
assigned_host, requirements, labels, branch_name, pr_title, created_at, assigned_at,
started_at, completed_at, last_activity_at, retry_count, max_retries, review_count,
timeout_seconds
FROM tasks WHERE task_id = ?1",
)?;
let result = match stmt.query_row(params![task_id], Self::row_to_task) {
Ok(task) => Ok(Some(task)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e),
};
drop(stmt);
result?
};
let Some(_original) = original else {
tx.commit()?;
return Ok(None);
};
tx.execute(
"UPDATE tasks
SET status = 'assigned',
execution_mode = ?1,
assigned_agent_id = ?2,
assigned_host = ?3,
assigned_at = ?4
WHERE task_id = ?5",
params![execution_mode.as_str(), agent_id, assigned_host, now, task_id],
)?;
Self::append_event(&tx, event)?;
let updated = {
let mut stmt = tx.prepare(
"SELECT task_id, source, task_type, priority, status, execution_mode, assigned_agent_id,
assigned_host, requirements, labels, branch_name, pr_title, created_at, assigned_at,
started_at, completed_at, last_activity_at, retry_count, max_retries, review_count,
timeout_seconds
FROM tasks WHERE task_id = ?1",
)?;
let result = stmt.query_row(params![task_id], Self::row_to_task)?;
drop(stmt);
result
};
tx.commit()?;
Ok(Some(updated))
}
pub fn append_event_direct(&self, event: &TaskEvent) -> SqlResult<()> { pub fn append_event_direct(&self, event: &TaskEvent) -> SqlResult<()> {
Self::append_event(&self.conn, event) Self::append_event(&self.conn, event)
} }

View file

@ -88,6 +88,7 @@ pub struct Agent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum ExecutionMode { pub enum ExecutionMode {
Undecided,
SshCli, SshCli,
HttpPull, HttpPull,
} }
@ -95,6 +96,7 @@ pub enum ExecutionMode {
impl ExecutionMode { impl ExecutionMode {
pub fn as_str(&self) -> &'static str { pub fn as_str(&self) -> &'static str {
match self { match self {
Self::Undecided => "undecided",
Self::SshCli => "ssh_cli", Self::SshCli => "ssh_cli",
Self::HttpPull => "http_pull", Self::HttpPull => "http_pull",
} }
@ -102,6 +104,7 @@ impl ExecutionMode {
pub fn from_str(value: &str) -> Self { pub fn from_str(value: &str) -> Self {
match value { match value {
"undecided" => Self::Undecided,
"http_pull" => Self::HttpPull, "http_pull" => Self::HttpPull,
_ => Self::SshCli, _ => Self::SshCli,
} }

View file

@ -35,9 +35,15 @@ impl Dispatcher {
store.list_tasks(Some("created"), None).map_err(|e| e.to_string())? store.list_tasks(Some("created"), None).map_err(|e| e.to_string())?
}; };
for task in tasks.into_iter().filter(|t| t.execution_mode == ExecutionMode::SshCli) { let mut undecided_without_host = Vec::new();
for task in tasks.into_iter().filter(|t| matches!(t.execution_mode, ExecutionMode::Undecided | ExecutionMode::SshCli)) {
if let Some((host, agent_type)) = self.select_host(&task).await? { if let Some((host, agent_type)) = self.select_host(&task).await? {
let agent_id = format!("{}:{}", host.host_id, agent_type); let agent_id = format!("{}:{}", host.host_id, agent_type);
if task.execution_mode == ExecutionMode::Undecided {
self.set_execution_mode(&task.task_id, ExecutionMode::SshCli).await?;
}
let assigned = self let assigned = self
.sm .sm
.transition_with_host(&task.task_id, TaskStatus::Assigned, Some(&agent_id), Some(&host.host_id), "ssh dispatch") .transition_with_host(&task.task_id, TaskStatus::Assigned, Some(&agent_id), Some(&host.host_id), "ssh dispatch")
@ -70,9 +76,15 @@ impl Dispatcher {
.await; .await;
} }
} }
} else if task.execution_mode == ExecutionMode::Undecided {
undecided_without_host.push(task.task_id);
} }
} }
for task_id in undecided_without_host {
self.set_execution_mode(&task_id, ExecutionMode::HttpPull).await?;
}
let review_tasks = { let review_tasks = {
let store = self.store.lock().map_err(|e| e.to_string())?; let store = self.store.lock().map_err(|e| e.to_string())?;
store.list_tasks(Some("review_pending"), None).map_err(|e| e.to_string())? store.list_tasks(Some("review_pending"), None).map_err(|e| e.to_string())?
@ -111,6 +123,17 @@ impl Dispatcher {
Ok(candidates.into_iter().next().map(|(h, a, _)| (h, a))) Ok(candidates.into_iter().next().map(|(h, a, _)| (h, a)))
} }
async fn set_execution_mode(&self, task_id: &str, mode: ExecutionMode) -> Result<(), String> {
let store = self.store.clone();
let task_id = task_id.to_string();
tokio::task::spawn_blocking(move || -> Result<(), String> {
let mut store = store.lock().map_err(|e| e.to_string())?;
store.update_task_execution_mode(&task_id, mode.as_str()).map_err(|e| e.to_string())
})
.await
.map_err(|e| e.to_string())?
}
fn current_host_loads(&self) -> Result<HashMap<(String, String), u32>, String> { fn current_host_loads(&self) -> Result<HashMap<(String, String), u32>, String> {
let store = self.store.lock().map_err(|e| e.to_string())?; let store = self.store.lock().map_err(|e| e.to_string())?;
let tasks = store.list_tasks(None, None).map_err(|e| e.to_string())?; let tasks = store.list_tasks(None, None).map_err(|e| e.to_string())?;
@ -191,12 +214,12 @@ mod tests {
HostConfig { HostConfig {
host_id: "h2".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22, host_id: "h2".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22,
ssh_key_path: None, work_dir: "/tmp".into(), ssh_key_path: None, work_dir: "/tmp".into(),
agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 2, capabilities: vec!["code:rust".into()] }], agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 2, capabilities: vec!["code:rust".into(), "agent:code".into()] }],
}, },
HostConfig { HostConfig {
host_id: "h1".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22, host_id: "h1".into(), hostname: "localhost".into(), ssh_user: "u".into(), ssh_port: 22,
ssh_key_path: None, work_dir: "/tmp".into(), ssh_key_path: None, work_dir: "/tmp".into(),
agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 1, capabilities: vec!["code:rust".into()] }], agents: vec![HostAgentConfig { agent_type: "codex-cli".into(), max_concurrency: 1, capabilities: vec!["code:rust".into(), "agent:code".into()] }],
}, },
], ],
} }
@ -215,6 +238,65 @@ mod tests {
assert_eq!(selected.0.host_id, "h2"); assert_eq!(selected.0.host_id, "h2");
} }
#[tokio::test]
async fn dispatches_undecided_task_to_ssh_cli_when_host_available() {
let dir = TempDir::new().unwrap();
let db = dir.path().join("test.db");
let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap()));
let sm = Arc::new(StateMachine::new(store.clone()));
let mut task = sample_task();
task.execution_mode = ExecutionMode::Undecided;
task.labels = vec!["agent:code".into()];
sm.create_task(&task).await.unwrap();
let dispatcher = Dispatcher::new(config(), store.clone(), sm);
let selected = dispatcher.select_host(&task).await.unwrap();
assert!(selected.is_some());
dispatcher.set_execution_mode(&task.task_id, ExecutionMode::SshCli).await.unwrap();
let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap();
assert_eq!(updated.execution_mode, ExecutionMode::SshCli);
assert_eq!(updated.status, TaskStatus::Created);
}
#[tokio::test]
async fn converts_unmatched_undecided_task_to_http_pull() {
let dir = TempDir::new().unwrap();
let db = dir.path().join("test.db");
let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap()));
let sm = Arc::new(StateMachine::new(store.clone()));
let mut task = sample_task();
task.execution_mode = ExecutionMode::Undecided;
task.labels = vec!["agent:document".into()];
sm.create_task(&task).await.unwrap();
let dispatcher = Dispatcher::new(config(), store.clone(), sm);
dispatcher.dispatch_once().await.unwrap();
let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap();
assert_eq!(updated.execution_mode, ExecutionMode::HttpPull);
assert_eq!(updated.status, TaskStatus::Created);
}
#[tokio::test]
async fn leaves_existing_http_pull_task_unchanged() {
let dir = TempDir::new().unwrap();
let db = dir.path().join("test.db");
let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap()));
let sm = Arc::new(StateMachine::new(store.clone()));
let mut task = sample_task();
task.execution_mode = ExecutionMode::HttpPull;
sm.create_task(&task).await.unwrap();
let dispatcher = Dispatcher::new(config(), store.clone(), sm);
dispatcher.dispatch_once().await.unwrap();
let updated = store.lock().unwrap().read_task(&task.task_id).unwrap().unwrap();
assert_eq!(updated.execution_mode, ExecutionMode::HttpPull);
assert_eq!(updated.status, TaskStatus::Created);
}
#[tokio::test] #[tokio::test]
async fn does_not_match_agent_label_without_capability() { async fn does_not_match_agent_label_without_capability() {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();

View file

@ -231,7 +231,7 @@ pub fn issue_event_to_task(event: &ForgejoIssueEvent, default_max_retries: u32,
task_type, task_type,
priority, priority,
status: TaskStatus::Created, status: TaskStatus::Created,
execution_mode: ExecutionMode::SshCli, execution_mode: ExecutionMode::Undecided,
assigned_agent_id: None, assigned_agent_id: None,
assigned_host: None, assigned_host: None,
requirements: format!("{}\n\n{}", event.issue.title, event.issue.body.clone().unwrap_or_default()).trim().to_string(), requirements: format!("{}\n\n{}", event.issue.title, event.issue.body.clone().unwrap_or_default()).trim().to_string(),
@ -381,7 +381,7 @@ mod tests {
assert_eq!(task.task_type, "code"); assert_eq!(task.task_type, "code");
assert_eq!(task.priority, Priority::High); assert_eq!(task.priority, Priority::High);
assert_eq!(task.status, TaskStatus::Created); assert_eq!(task.status, TaskStatus::Created);
assert_eq!(task.execution_mode, ExecutionMode::SshCli); assert_eq!(task.execution_mode, ExecutionMode::Undecided);
assert!(task.branch_name.is_some()); assert!(task.branch_name.is_some());
assert!(task.pr_title.is_some()); assert!(task.pr_title.is_some());
} }

View file

@ -78,6 +78,7 @@ async fn main() {
.route("/api/v1/tasks", axum::routing::get(api::list_tasks)) .route("/api/v1/tasks", axum::routing::get(api::list_tasks))
.route("/api/v1/tasks/dequeue", axum::routing::post(api::dequeue_task)) .route("/api/v1/tasks/dequeue", axum::routing::post(api::dequeue_task))
.route("/api/v1/tasks/{task_id}", axum::routing::get(api::get_task)) .route("/api/v1/tasks/{task_id}", axum::routing::get(api::get_task))
.route("/api/v1/tasks/{task_id}/assign", axum::routing::post(api::assign_task))
.route("/api/v1/tasks/{task_id}/status", axum::routing::post(api::update_task_status)) .route("/api/v1/tasks/{task_id}/status", axum::routing::post(api::update_task_status))
.route("/api/v1/tasks/{task_id}/complete", axum::routing::post(api::complete_task)) .route("/api/v1/tasks/{task_id}/complete", axum::routing::post(api::complete_task))
.route("/api/v1/tasks/{task_id}/retry", axum::routing::post(api::retry_task)) .route("/api/v1/tasks/{task_id}/retry", axum::routing::post(api::retry_task))