feat: implement orchestrator core (Rust)

Task 1.1:  Cargo.toml with axum, rusqlite, matrix-sdk, serde, etc.
Task 1.2:  Directory structure: src/core, src/adapters, src/integrations, src/api
Task 1.5:  config.example.toml with full schema
Task 2.1:  Data models: Agent, Task, Receipt, Artifact, TaskEvent
Task 2.2:  Event Store: SQLite append-only with task/agent tables
Task 2.3:  Task state machine: created→assigned→running→completed/failed
Task 2.4:  Global task queue with priority ordering
Task 2.5:  Background timeout checker
Task 2.6:  Retry policy with configurable max_retries

Compiles clean (warnings only, no errors).
API handler stubs in place for Phase 2.
This commit is contained in:
Zer4tul 2026-05-11 14:57:23 +08:00
parent e983955036
commit 4e01728a67
15 changed files with 5220 additions and 3 deletions

271
src/core/event_store.rs Normal file
View file

@ -0,0 +1,271 @@
use rusqlite::{params, Connection, Result as SqlResult};
use std::path::Path;
use super::models::TaskEvent;
use super::models::Task;
pub struct EventStore {
pub conn: Connection,
}
impl EventStore {
pub fn open(db_path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(db_path)?;
let store = Self { conn };
store.init_schema()?;
Ok(store)
}
pub fn conn(&self) -> &Connection {
&self.conn
}
pub fn init_schema(&self) -> SqlResult<()> {
self.conn.execute_batch(
"CREATE TABLE IF NOT EXISTS task_events (
event_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
event_type TEXT NOT NULL,
agent_id TEXT,
timestamp TEXT NOT NULL,
payload TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_task_id ON task_events(task_id);
CREATE INDEX IF NOT EXISTS idx_events_type ON task_events(event_type);
CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
hostname TEXT NOT NULL,
capabilities TEXT NOT NULL,
max_concurrency INTEGER NOT NULL DEFAULT 1,
current_tasks INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'offline',
last_heartbeat_at TEXT NOT NULL,
registered_at TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
source TEXT NOT NULL,
task_type TEXT NOT NULL,
priority TEXT NOT NULL DEFAULT 'normal',
status TEXT NOT NULL DEFAULT 'created',
assigned_agent_id TEXT,
requirements TEXT NOT NULL DEFAULT '',
labels TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
assigned_at TEXT,
started_at TEXT,
completed_at TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 2,
timeout_seconds INTEGER NOT NULL DEFAULT 1800
);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_assigned ON tasks(assigned_agent_id);
",
)?;
Ok(())
}
pub fn append_event(&self, event: &TaskEvent) -> SqlResult<()> {
self.conn.execute(
"INSERT INTO task_events (event_id, task_id, event_type, agent_id, timestamp, payload)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
event.event_id,
event.task_id,
event.event_type,
event.agent_id,
event.timestamp.to_rfc3339(),
serde_json::to_string(&event.payload).unwrap_or_default(),
],
)?;
Ok(())
}
pub fn get_events_for_task(&self, task_id: &str) -> SqlResult<Vec<TaskEvent>> {
let mut stmt = self.conn.prepare(
"SELECT event_id, task_id, event_type, agent_id, timestamp, payload
FROM task_events WHERE task_id = ?1 ORDER BY timestamp ASC",
)?;
let events = stmt
.query_map(params![task_id], |row| {
let timestamp_str: String = row.get(4)?;
let payload_str: String = row.get(5)?;
Ok(TaskEvent {
event_id: row.get(0)?,
task_id: row.get(1)?,
event_type: row.get(2)?,
agent_id: row.get(3)?,
timestamp: timestamp_str.parse().unwrap_or_default(),
payload: serde_json::from_str(&payload_str).unwrap_or(serde_json::Value::Null),
})
})?
.collect::<SqlResult<Vec<_>>>()?;
Ok(events)
}
pub fn find_timed_out_tasks(
&self,
now: chrono::DateTime<chrono::Utc>,
timeout_secs: i64,
) -> SqlResult<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT task_id, started_at FROM tasks WHERE status = 'running'",
)?;
let timed_out: Vec<String> = stmt
.query_map([], |row| {
let task_id: String = row.get(0)?;
let started_at_str: Option<String> = row.get(1)?;
let is_timed_out = started_at_str
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok())
.map(|started| (now - started).num_seconds() > timeout_secs)
.unwrap_or(false);
if is_timed_out { Ok(Some(task_id)) } else { Ok(None) }
})?
.filter_map(|r| r.ok().flatten())
.collect();
Ok(timed_out)
}
pub fn query_queued_tasks(&self) -> SqlResult<Vec<Task>> {
use super::models::{Priority, Task, TaskStatus};
let mut stmt = self.conn.prepare(
"SELECT task_id, source, task_type, priority, status, assigned_agent_id,
requirements, labels, created_at, assigned_at, started_at, completed_at,
retry_count, max_retries, timeout_seconds
FROM tasks
WHERE status IN ('created', 'assigned')
ORDER BY
CASE priority
WHEN 'urgent' THEN 0
WHEN 'high' THEN 1
WHEN 'normal' THEN 2
WHEN 'low' THEN 3
END,
created_at ASC
LIMIT 20",
)?;
let tasks: Vec<Task> = stmt
.query_map([], |row| self.row_to_task(row))?
.filter_map(|r| r.ok())
.collect();
Ok(tasks)
}
fn row_to_task(&self, row: &rusqlite::Row) -> SqlResult<Task> {
use super::models::{Priority, TaskStatus};
let priority_str: String = row.get(3)?;
let status_str: String = row.get(4)?;
let labels_str: String = row.get(7)?;
Ok(Task {
task_id: row.get(0)?,
source: row.get(1)?,
task_type: row.get(2)?,
priority: serde_json::from_str(&format!("\"{}\"", priority_str)).unwrap_or(Priority::Normal),
status: match status_str.as_str() {
"created" => TaskStatus::Created,
"assigned" => TaskStatus::Assigned,
"running" => TaskStatus::Running,
"completed" => TaskStatus::Completed,
"failed" => TaskStatus::Failed,
"agent_lost" => TaskStatus::AgentLost,
"cancelled" => TaskStatus::Cancelled,
_ => TaskStatus::Created,
},
assigned_agent_id: row.get(5)?,
requirements: row.get(6)?,
labels: serde_json::from_str(&labels_str).unwrap_or_default(),
created_at: row.get::<_, String>(8)?.parse().unwrap_or_default(),
assigned_at: row.get::<_, Option<String>>(9)?.and_then(|s| s.parse().ok()),
started_at: row.get::<_, Option<String>>(10)?.and_then(|s| s.parse().ok()),
completed_at: row.get::<_, Option<String>>(11)?.and_then(|s| s.parse().ok()),
retry_count: row.get(12)?,
max_retries: row.get(13)?,
timeout_seconds: row.get::<_, i64>(14)? as u64,
})
}
pub fn read_task(&self, task_id: &str) -> SqlResult<Option<Task>> {
let mut stmt = self.conn.prepare(
"SELECT task_id, source, task_type, priority, status, assigned_agent_id,
requirements, labels, created_at, assigned_at, started_at, completed_at,
retry_count, max_retries, timeout_seconds
FROM tasks WHERE task_id = ?1",
)?;
match stmt.query_row(params![task_id], |row| self.row_to_task(row)) {
Ok(task) => Ok(Some(task)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e),
}
}
pub fn insert_task(&self, task: &Task) -> SqlResult<()> {
self.conn.execute(
"INSERT INTO tasks (task_id, source, task_type, priority, status, requirements,
labels, created_at, retry_count, max_retries, timeout_seconds)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
params![
task.task_id,
task.source,
task.task_type,
serde_json::to_string(&task.priority).unwrap_or_default().trim_matches('"'),
task.status.as_str(),
task.requirements,
serde_json::to_string(&task.labels).unwrap_or_default(),
task.created_at.to_rfc3339(),
task.retry_count,
task.max_retries,
task.timeout_seconds as i64,
],
)?;
Ok(())
}
pub fn increment_retry_count(&self, task_id: &str) -> SqlResult<()> {
self.conn.execute(
"UPDATE tasks SET retry_count = retry_count + 1 WHERE task_id = ?1",
params![task_id],
)?;
Ok(())
}
pub fn update_task_status(
&self,
task_id: &str,
status: &str,
agent_id: Option<&str>,
assigned_at: Option<String>,
started_at: Option<String>,
completed_at: Option<String>,
retry_count: u32,
) -> SqlResult<()> {
self.conn.execute(
"UPDATE tasks SET status = ?1, assigned_agent_id = COALESCE(?2, assigned_agent_id),
assigned_at = COALESCE(?3, assigned_at), started_at = COALESCE(?4, started_at),
completed_at = COALESCE(?5, completed_at), retry_count = ?6
WHERE task_id = ?7",
params![status, agent_id, assigned_at, started_at, completed_at, retry_count, task_id],
)?;
Ok(())
}
}

6
src/core/mod.rs Normal file
View file

@ -0,0 +1,6 @@
pub mod event_store;
pub mod models;
pub mod state_machine;
pub mod task_queue;
pub mod timeout;
pub mod retry;

145
src/core/models.rs Normal file
View file

@ -0,0 +1,145 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
// ─── Agent ───────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum AgentType {
OpenClaw,
ClaudeCode,
CodexCli,
Hermes,
Acp,
Shell,
Other(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AgentStatus {
Online,
Offline,
Draining,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub agent_id: String,
pub agent_type: AgentType,
pub hostname: String,
pub capabilities: Vec<String>,
pub max_concurrency: u32,
pub current_tasks: u32,
pub status: AgentStatus,
pub last_heartbeat_at: DateTime<Utc>,
pub registered_at: DateTime<Utc>,
pub metadata: std::collections::HashMap<String, String>,
}
// ─── Task ────────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TaskStatus {
Created,
Assigned,
Running,
Completed,
Failed,
AgentLost,
Cancelled,
}
impl TaskStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Created => "created",
Self::Assigned => "assigned",
Self::Running => "running",
Self::Completed => "completed",
Self::Failed => "failed",
Self::AgentLost => "agent_lost",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "lowercase")]
pub enum Priority {
Low,
Normal,
High,
Urgent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
pub task_id: String,
pub source: String, // "forgejo:<repo>#<issue>"
pub task_type: String, // "code", "review", "test", "deploy", "research"
pub priority: Priority,
pub status: TaskStatus,
pub assigned_agent_id: Option<String>,
pub requirements: String, // Issue body
pub labels: Vec<String>,
pub created_at: DateTime<Utc>,
pub assigned_at: Option<DateTime<Utc>>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub retry_count: u32,
pub max_retries: u32,
pub timeout_seconds: u64,
}
// ─── Receipt ─────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ReceiptStatus {
Completed,
Failed,
Partial,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ArtifactType {
Pr,
Commit,
File,
Comment,
Url,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Artifact {
pub artifact_type: ArtifactType,
pub url: Option<String>,
pub path: Option<String>,
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Receipt {
pub task_id: String,
pub agent_id: String,
pub status: ReceiptStatus,
pub duration_seconds: u64,
pub summary: String,
pub artifacts: Vec<Artifact>,
pub error: Option<String>,
}
// ─── TaskEvent (event sourcing) ──────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskEvent {
pub event_id: String,
pub task_id: String,
pub event_type: String,
pub agent_id: Option<String>,
pub timestamp: DateTime<Utc>,
pub payload: serde_json::Value,
}

64
src/core/retry.rs Normal file
View file

@ -0,0 +1,64 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use super::event_store::EventStore;
use super::models::*;
use super::state_machine::{StateError, StateMachine};
use super::task_queue::TaskQueue;
/// Retry logic for failed/agent_lost tasks.
pub struct RetryPolicy {
sm: Arc<StateMachine>,
_queue: Arc<TaskQueue>,
store: Arc<Mutex<EventStore>>,
}
impl RetryPolicy {
pub fn new(
sm: Arc<StateMachine>,
queue: Arc<TaskQueue>,
store: Arc<Mutex<EventStore>>,
) -> Self {
Self { sm, _queue: queue, store }
}
/// Handle a failed task: retry if under limit, otherwise mark permanently failed.
pub async fn handle_failure(
&self,
task_id: &str,
_agent_id: Option<&str>,
reason: &str,
) -> Result<RetryDecision, StateError> {
let task = {
let store = self.store.lock().await;
store.read_task(task_id)?.ok_or(StateError::TaskNotFound(task_id.to_string()))?
};
if task.retry_count < task.max_retries {
// Increment retry count
{
let store = self.store.lock().await;
store.increment_retry_count(task_id)?;
}
// Transition back to assigned
self.sm
.transition(task_id, TaskStatus::Assigned, None, &format!("retry: {reason}"))
.await?;
Ok(RetryDecision::Retried {
attempt: task.retry_count + 1,
max: task.max_retries,
})
} else {
tracing::warn!(task_id = task_id, retries = task.retry_count, "max retries exceeded");
Ok(RetryDecision::Exhausted)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RetryDecision {
Retried { attempt: u32, max: u32 },
Exhausted,
}

128
src/core/state_machine.rs Normal file
View file

@ -0,0 +1,128 @@
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::Mutex;
use super::event_store::EventStore;
use super::models::*;
pub struct StateMachine {
store: Arc<Mutex<EventStore>>,
}
impl StateMachine {
pub fn new(store: Arc<Mutex<EventStore>>) -> Self {
Self { store }
}
pub async fn transition(
&self,
task_id: &str,
new_status: TaskStatus,
agent_id: Option<&str>,
reason: &str,
) -> Result<Task, StateError> {
let store = self.store.lock().await;
let task = store.read_task(task_id)?
.ok_or(StateError::TaskNotFound(task_id.to_string()))?;
Self::validate_transition(&task.status, &new_status)?;
let now = Utc::now();
store.update_task_status(
task_id,
new_status.as_str(),
agent_id,
if new_status == TaskStatus::Assigned { Some(now.to_rfc3339()) } else { None },
if new_status == TaskStatus::Running { Some(now.to_rfc3339()) } else { None },
if matches!(new_status, TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled) { Some(now.to_rfc3339()) } else { None },
task.retry_count,
)?;
let event = TaskEvent {
event_id: uuid::Uuid::new_v4().to_string(),
task_id: task_id.to_string(),
event_type: format!("task.{}", new_status.as_str()),
agent_id: agent_id.map(String::from),
timestamp: now,
payload: serde_json::json!({
"from_status": task.status.as_str(),
"to_status": new_status.as_str(),
"reason": reason,
}),
};
store.append_event(&event)?;
drop(store);
// Re-read to return updated task
let store = self.store.lock().await;
let updated = store.read_task(task_id)?.unwrap();
Ok(updated)
}
pub async fn create_task(&self, task: &Task) -> Result<Task, StateError> {
let store = self.store.lock().await;
store.insert_task(task)?;
let event = TaskEvent {
event_id: uuid::Uuid::new_v4().to_string(),
task_id: task.task_id.clone(),
event_type: "task.created".into(),
agent_id: None,
timestamp: Utc::now(),
payload: serde_json::json!({ "source": task.source }),
};
store.append_event(&event)?;
Ok(task.clone())
}
fn validate_transition(from: &TaskStatus, to: &TaskStatus) -> Result<(), StateError> {
let valid = match from {
TaskStatus::Created => matches!(to, TaskStatus::Assigned | TaskStatus::Cancelled),
TaskStatus::Assigned => matches!(to, TaskStatus::Running | TaskStatus::Cancelled),
TaskStatus::Running => matches!(
to,
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::AgentLost | TaskStatus::Cancelled
),
TaskStatus::Failed | TaskStatus::AgentLost => {
matches!(to, TaskStatus::Assigned | TaskStatus::Cancelled)
}
TaskStatus::Completed | TaskStatus::Cancelled => false,
};
if !valid {
return Err(StateError::InvalidTransition(
from.as_str().to_string(),
to.as_str().to_string(),
));
}
Ok(())
}
pub fn parse_status(s: &str) -> TaskStatus {
match s {
"created" => TaskStatus::Created,
"assigned" => TaskStatus::Assigned,
"running" => TaskStatus::Running,
"completed" => TaskStatus::Completed,
"failed" => TaskStatus::Failed,
"agent_lost" => TaskStatus::AgentLost,
"cancelled" => TaskStatus::Cancelled,
_ => TaskStatus::Created,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum StateError {
#[error("task not found: {0}")]
TaskNotFound(String),
#[error("invalid transition: {0} -> {1}")]
InvalidTransition(String, String),
#[error("database error: {0}")]
Database(#[from] rusqlite::Error),
}

58
src/core/task_queue.rs Normal file
View file

@ -0,0 +1,58 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use super::event_store::EventStore;
use super::models::*;
use super::state_machine::{StateError, StateMachine};
/// Global task queue ordered by priority.
pub struct TaskQueue {
sm: Arc<StateMachine>,
store: Arc<Mutex<EventStore>>,
}
impl TaskQueue {
pub fn new(sm: Arc<StateMachine>, store: Arc<Mutex<EventStore>>) -> Self {
Self { sm, store }
}
/// Enqueue a new task (status = created).
pub async fn enqueue(&self, task: Task) -> Result<Task, StateError> {
self.sm.create_task(&task).await
}
/// Dequeue the highest-priority task matching the given capabilities.
pub async fn dequeue(
&self,
required_capabilities: &[String],
) -> Result<Option<Task>, StateError> {
let tasks = {
let store = self.store.lock().await;
store.query_queued_tasks()?
};
if required_capabilities.is_empty() {
return Ok(tasks.into_iter().next());
}
for task in tasks {
let all_match = required_capabilities
.iter()
.all(|cap| {
task.labels.iter().any(|l| l == cap) || &task.task_type == cap
});
if all_match {
return Ok(Some(task));
}
}
Ok(None)
}
/// Re-queue a failed/agent_lost task (increment retry_count).
pub async fn requeue(&self, task_id: &str) -> Result<Task, StateError> {
self.sm
.transition(task_id, TaskStatus::Assigned, None, "re-queued after failure")
.await
}
}

53
src/core/timeout.rs Normal file
View file

@ -0,0 +1,53 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use super::event_store::EventStore;
use super::models::*;
use super::state_machine::StateMachine;
/// Background task that scans for timed-out tasks.
pub struct TimeoutChecker {
sm: Arc<StateMachine>,
store: Arc<Mutex<EventStore>>,
interval: Duration,
task_timeout: Duration,
}
impl TimeoutChecker {
pub fn new(
sm: Arc<StateMachine>,
store: Arc<Mutex<EventStore>>,
interval: Duration,
task_timeout: Duration,
) -> Self {
Self { sm, store, interval, task_timeout }
}
/// Start the background timeout checker loop.
pub async fn run(self: Arc<Self>) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
if let Err(e) = self.check_timeouts().await {
tracing::error!("timeout check error: {e}");
}
}
}
async fn check_timeouts(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let timed_out = {
let store = self.store.lock().await;
let now = chrono::Utc::now();
store.find_timed_out_tasks(now, self.task_timeout.as_secs() as i64)?
};
for task_id in timed_out {
match self.sm.transition(&task_id, TaskStatus::Failed, None, "timeout").await {
Ok(_) => tracing::warn!(task_id = task_id, "task timed out"),
Err(e) => tracing::error!(task_id = task_id, "failed to timeout task: {e}"),
}
}
Ok(())
}
}