From 6efca090188829cf1a05062323c37411a0f68fd9 Mon Sep 17 00:00:00 2001 From: Zer4tul Date: Tue, 12 May 2026 01:12:59 +0800 Subject: [PATCH] feat: Matrix ChatOps bot (Task 5) - Matrix bot via matrix-sdk: connect, join room, sync loop - /fleet status: list all agents with status table - /assign : manual task assignment - /retry : re-queue failed/agent_lost task - Notification formatting: task assigned/completed/failed, agent offline - Per-agent thread support via Matrix Relation::Thread - 15 tests: command parsing, notification formatting, fleet status table --- Cargo.lock | 1 + Cargo.toml | 1 + src/integrations/matrix/mod.rs | 531 +++++++++++++++++++++++++++++++++ src/integrations/mod.rs | 1 + src/main.rs | 16 + 5 files changed, 550 insertions(+) create mode 100644 src/integrations/matrix/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f7c1b8d..d159cac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,7 @@ dependencies = [ "hmac", "matrix-sdk", "reqwest", + "ruma", "rusqlite", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 18432fd..ef5a30c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ reqwest = { version = "0.12", features = ["json"] } # Matrix SDK matrix-sdk = "0.10" +ruma = { version = "0.12", features = ["client-api-c", "rand", "unstable-msc3061", "unstable-msc2448"] } # Logging tracing = "0.1" diff --git a/src/integrations/matrix/mod.rs b/src/integrations/matrix/mod.rs new file mode 100644 index 0000000..1103a86 --- /dev/null +++ b/src/integrations/matrix/mod.rs @@ -0,0 +1,531 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use matrix_sdk::authentication::matrix::MatrixSession; +use matrix_sdk::config::SyncSettings; +use matrix_sdk::ruma::events::room::message::{ + MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, +}; +use matrix_sdk::ruma::events::relation::Thread; +use matrix_sdk::ruma::OwnedEventId; +use matrix_sdk::ruma::{OwnedDeviceId, OwnedRoomId, OwnedUserId, UserId}; +use matrix_sdk::{Client, Room}; +use tokio::sync::RwLock; + +use crate::config::MatrixConfig; +use crate::core::event_store::EventStore; +use crate::core::models::{Agent, TaskStatus}; +use crate::core::state_machine::StateMachine; + +/// The bot context — shared state for all handlers. +#[derive(Clone)] +pub struct BotContext { + pub store: Arc>, + pub sm: Arc, + pub config: MatrixConfig, + /// Maps agent_id → root event_id for per-agent threads. + pub agent_threads: Arc>>, +} + +// ─── Command parsing ──────────────────────────────────────────── + +/// A parsed Matrix command. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BotCommand { + FleetStatus, + Assign { agent_id: String, issue_ref: String }, + Retry { issue_ref: String }, + Unknown(String), +} + +/// Parse a Matrix message body into a BotCommand. +/// Only returns Some for known command prefixes: fleet, assign, retry. +pub fn parse_command(body: &str) -> Option { + let body = body.trim(); + // Accept both "/fleet status" and "fleet status" (in case the / is stripped) + let body = body.strip_prefix('/').unwrap_or(body); + + let mut parts = body.splitn(4, ' '); + let first = parts.next()?.to_lowercase(); + match first.as_str() { + "fleet" => { + let sub = parts.next()?.to_lowercase(); + match sub.as_str() { + "status" => Some(BotCommand::FleetStatus), + _ => Some(BotCommand::Unknown(format!("fleet {sub}"))), + } + } + "assign" => { + let agent_id = parts.next()?.to_string(); + let issue_ref = parts.next()?.to_string(); + Some(BotCommand::Assign { agent_id, issue_ref }) + } + "retry" => { + let issue_ref = parts.next()?.to_string(); + Some(BotCommand::Retry { issue_ref }) + } + _ => None, + } +} + +// ─── Notification formatting ──────────────────────────────────── + +/// Format a task-assigned notification. +/// +/// Spec: `📋 #42 → worker-03 [code:typescript]` +pub fn format_task_assigned(task_id: &str, agent_id: &str, task_type: &str) -> String { + format!("📋 {task_id} → {agent_id} [{task_type}]") +} + +/// Format a task-completed notification. +/// +/// Spec: `✅ #42 completed by worker-03 — PR #15 — "修复登录验证 bug"` +pub fn format_task_completed(task_id: &str, agent_id: &str, summary: &str, artifact_hint: Option<&str>) -> String { + match artifact_hint { + Some(hint) => format!("✅ {task_id} completed by {agent_id} — {hint} — \"{summary}\""), + None => format!("✅ {task_id} completed by {agent_id} — \"{summary}\""), + } +} + +/// Format a task-failed notification. +/// +/// Spec: `❌ #42 failed — worker-03 — "构建超时"` +pub fn format_task_failed(task_id: &str, agent_id: &str, error: &str) -> String { + format!("❌ {task_id} failed — {agent_id} — \"{error}\"") +} + +/// Format an agent-offline alert. +/// +/// Spec: `⚠️ worker-03 offline — 2 running tasks affected` +pub fn format_agent_offline(agent_id: &str, affected_tasks: usize) -> String { + format!("⚠️ {agent_id} offline — {affected_tasks} running tasks affected") +} + +/// Format the fleet status table as plain text. +pub fn format_fleet_status(agents: &[Agent]) -> String { + if agents.is_empty() { + return "No agents registered.".to_string(); + } + + let mut lines = vec!["Agent ID | Type | Status | Tasks | Capabilities".to_string()]; + lines.push("-----------------+--------------+---------+-------+---------------------------".to_string()); + + for agent in agents { + let caps = agent.capabilities.join(", "); + let caps_display = if caps.len() > 27 { + format!("{}…", &caps[..24]) + } else { + caps + }; + lines.push(format!( + "{:<16} | {:<12} | {:<7} | {:<5} | {}", + truncate_str(&agent.agent_id, 16), + truncate_str(agent.agent_type.as_str(), 12), + agent.status.as_str(), + agent.current_tasks, + caps_display, + )); + } + + lines.join("\n") +} + +fn truncate_str(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + format!("{}…", &s[..max_len - 1]) + } +} + +// ─── Bot lifecycle ────────────────────────────────────────────── + +/// Build a Matrix client from config. +pub async fn build_client(config: &MatrixConfig) -> Result> { + let client = Client::builder() + .homeserver_url(&config.homeserver_url) + .build() + .await?; + + let user_id = UserId::parse(&config.user_id)?; + let device_id: OwnedDeviceId = "AGENTFLEET".into(); + + let session = MatrixSession { + meta: matrix_sdk::SessionMeta { + user_id, + device_id, + }, + tokens: matrix_sdk::authentication::matrix::MatrixSessionTokens { + access_token: config.access_token.clone(), + refresh_token: None, + }, + }; + + client.restore_session(session).await?; + Ok(client) +} + +/// Start the Matrix bot — spawns a background sync loop. +pub async fn start_bot( + config: MatrixConfig, + store: Arc>, + sm: Arc, +) -> Result<(), Box> { + let room_id = OwnedRoomId::try_from(config.room_id.as_str())?; + let client = build_client(&config).await?; + + // Join the room + let _ = client.join_room_by_id(&room_id).await?; + + let ctx = BotContext { + store, + sm, + config, + agent_threads: Arc::new(RwLock::new(HashMap::new())), + }; + + // Register the message handler + client.add_event_handler(move |ev: OriginalSyncRoomMessageEvent, room: Room| { + let ctx = ctx.clone(); + async move { + handle_message(ev, room, ctx).await; + } + }); + + // Spawn the sync loop in background + tokio::spawn(async move { + loop { + match client.sync(SyncSettings::default()).await { + Ok(_) => {} + Err(e) => { + tracing::error!("Matrix sync error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + } + }); + + tracing::info!("Matrix bot started, monitoring room {}", room_id); + Ok(()) +} + +/// Handle an incoming Matrix room message. +async fn handle_message( + ev: OriginalSyncRoomMessageEvent, + room: Room, + ctx: BotContext, +) { + // Only handle text messages + let text = match &ev.content.msgtype { + MessageType::Text(text) => text.body.clone(), + _ => return, + }; + + // Skip our own messages + if ev.sender == ctx.config.user_id.parse::().unwrap_or_else(|_| ev.sender.clone()) { + return; + } + + let Some(cmd) = parse_command(&text) else { + return; + }; + + tracing::debug!(?cmd, "received matrix command"); + + match cmd { + BotCommand::FleetStatus => { + let agents = { + let store = ctx.store.lock().unwrap(); + store.list_agents(None, None).unwrap_or_default() + }; + let table = format_fleet_status(&agents); + send_plain(&room, &table).await; + } + BotCommand::Assign { agent_id, issue_ref } => { + let result = { + let store = ctx.store.lock().unwrap(); + let task = store.read_task(&issue_ref); + let agent = store.find_agent_by_id(&agent_id); + (task, agent) + }; + + match result { + (Ok(Some(_task)), Ok(Some(_agent))) => { + match ctx.sm.transition(&issue_ref, TaskStatus::Assigned, Some(&agent_id), "manual assign via matrix").await { + Ok(updated) => { + let msg = format_task_assigned(&updated.task_id, &agent_id, &updated.task_type); + send_plain(&room, &msg).await; + } + Err(e) => { + send_plain(&room, &format!("❌ Failed to assign: {e}")).await; + } + } + } + (Ok(None), _) => { + send_plain(&room, &format!("Task not found: {issue_ref}")).await; + } + (_, Ok(None)) => { + send_plain(&room, &format!("Agent not found: {agent_id}")).await; + } + (Err(e), _) | (_, Err(e)) => { + send_plain(&room, &format!("Database error: {e}")).await; + } + } + } + BotCommand::Retry { issue_ref } => { + let task_status = { + let store = ctx.store.lock().unwrap(); + store.read_task(&issue_ref).ok().flatten() + }; + + let Some(task) = task_status else { + send_plain(&room, &format!("Task not found: {issue_ref}")).await; + return; + }; + + if !matches!(task.status, TaskStatus::Failed | TaskStatus::AgentLost) { + send_plain(&room, &format!("Task {issue_ref} is not in a retryable state (current: {})", task.status.as_str())).await; + return; + } + + // Reset to created (re-enqueue) + match ctx.sm.transition(&issue_ref, TaskStatus::Assigned, None, "retry via matrix").await { + Ok(_) => { + send_plain(&room, &format!("🔄 Task {issue_ref} re-queued for assignment")).await; + } + Err(e) => { + send_plain(&room, &format!("❌ Failed to retry: {e}")).await; + } + } + } + BotCommand::Unknown(cmd) => { + send_plain(&room, &format!("Unknown command: `{cmd}`. Available: /fleet status, /assign , /retry ")).await; + } + } +} + +/// Send a plain text message to a room. +async fn send_plain(room: &Room, body: &str) { + let content = RoomMessageEventContent::text_plain(body); + if let Err(e) = room.send(content).await { + tracing::error!("Failed to send Matrix message: {e}"); + } +} + +/// Send a message as a thread reply (per-agent thread). +pub async fn send_thread_message( + room: &Room, + thread_root_event_id: &ruma::OwnedEventId, + body: &str, +) { + let mut content = RoomMessageEventContent::text_plain(body); + content.relates_to = Some(matrix_sdk::ruma::events::room::message::Relation::Thread( + Thread::without_fallback(thread_root_event_id.clone()), + )); + if let Err(e) = room.send(content).await { + tracing::error!("Failed to send thread message: {e}"); + } +} + +/// Post a notification for a task event to the Matrix room. +/// This is a helper meant to be called from the API layer. +pub async fn notify_task_event( + room: &Room, + event_type: &str, + task_id: &str, + agent_id: &str, + task_type: &str, + summary: Option<&str>, + error: Option<&str>, + artifact_hint: Option<&str>, +) { + let msg = match event_type { + "task.assigned" => format_task_assigned(task_id, agent_id, task_type), + "task.completed" => format_task_completed( + task_id, + agent_id, + summary.unwrap_or(""), + artifact_hint, + ), + "task.failed" => format_task_failed( + task_id, + agent_id, + error.unwrap_or("unknown error"), + ), + _ => return, + }; + send_plain(room, &msg).await; +} + +/// Post an agent-offline alert to the Matrix room. +pub async fn notify_agent_offline(room: &Room, agent_id: &str, affected_tasks: usize) { + let msg = format_agent_offline(agent_id, affected_tasks); + send_plain(room, &msg).await; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::models::{Agent, AgentType, AgentStatus}; + use chrono::Utc; + use std::collections::HashMap; + + // ─── Command parsing tests ────────────────────────────────── + + #[test] + fn parse_fleet_status() { + assert_eq!(parse_command("/fleet status"), Some(BotCommand::FleetStatus)); + assert_eq!(parse_command("fleet status"), Some(BotCommand::FleetStatus)); + assert_eq!(parse_command("/FLEET STATUS"), Some(BotCommand::FleetStatus)); + assert_eq!(parse_command(" /fleet status "), Some(BotCommand::FleetStatus)); + } + + #[test] + fn parse_assign_command() { + assert_eq!( + parse_command("/assign worker-03 org/repo#42"), + Some(BotCommand::Assign { + agent_id: "worker-03".into(), + issue_ref: "org/repo#42".into(), + }) + ); + assert_eq!( + parse_command("assign worker-03 org/repo#42"), + Some(BotCommand::Assign { + agent_id: "worker-03".into(), + issue_ref: "org/repo#42".into(), + }) + ); + } + + #[test] + fn parse_retry_command() { + assert_eq!( + parse_command("/retry org/repo#42"), + Some(BotCommand::Retry { + issue_ref: "org/repo#42".into(), + }) + ); + assert_eq!( + parse_command("retry #42"), + Some(BotCommand::Retry { + issue_ref: "#42".into(), + }) + ); + } + + #[test] + fn parse_unknown_command() { + // Non-fleet/assign/retry prefixes return None (not a bot command) + assert_eq!(parse_command("/deploy prod"), None); + } + + #[test] + fn parse_non_command_returns_none() { + assert_eq!(parse_command("hello world"), None); + assert_eq!(parse_command(""), None); + assert_eq!(parse_command("just chatting"), None); + } + + #[test] + fn parse_fleet_subcommand_unknown() { + assert_eq!( + parse_command("/fleet deploy"), + Some(BotCommand::Unknown("fleet deploy".into())) + ); + } + + // ─── Notification formatting tests ────────────────────────── + + #[test] + fn task_assigned_format() { + let msg = format_task_assigned("org/repo#42", "worker-03", "code:typescript"); + assert_eq!(msg, "📋 org/repo#42 → worker-03 [code:typescript]"); + } + + #[test] + fn task_completed_format_with_artifact() { + let msg = format_task_completed( + "org/repo#42", + "worker-03", + "修复登录验证 bug", + Some("PR #15"), + ); + assert_eq!( + msg, + "✅ org/repo#42 completed by worker-03 — PR #15 — \"修复登录验证 bug\"" + ); + } + + #[test] + fn task_completed_format_without_artifact() { + let msg = format_task_completed( + "org/repo#42", + "worker-03", + "fixed the thing", + None, + ); + assert_eq!(msg, "✅ org/repo#42 completed by worker-03 — \"fixed the thing\""); + } + + #[test] + fn task_failed_format() { + let msg = format_task_failed("org/repo#42", "worker-03", "构建超时"); + assert_eq!(msg, "❌ org/repo#42 failed — worker-03 — \"构建超时\""); + } + + #[test] + fn agent_offline_format() { + let msg = format_agent_offline("worker-03", 2); + assert_eq!(msg, "⚠️ worker-03 offline — 2 running tasks affected"); + } + + // ─── Fleet status table tests ─────────────────────────────── + + fn sample_agent(id: &str, status: AgentStatus, tasks: u32) -> Agent { + Agent { + agent_id: id.to_string(), + agent_type: AgentType::CodexCli, + hostname: "host-01".into(), + capabilities: vec!["code:rust".into(), "review".into()], + max_concurrency: 2, + current_tasks: tasks, + status, + last_heartbeat_at: Utc::now(), + registered_at: Utc::now(), + metadata: HashMap::new(), + } + } + + #[test] + fn fleet_status_empty() { + let table = format_fleet_status(&[]); + assert_eq!(table, "No agents registered."); + } + + #[test] + fn fleet_status_with_agents() { + let agents = vec![ + sample_agent("worker-01", AgentStatus::Online, 1), + sample_agent("worker-02", AgentStatus::Offline, 0), + ]; + let table = format_fleet_status(&agents); + assert!(table.contains("worker-01")); + assert!(table.contains("worker-02")); + assert!(table.contains("online")); + assert!(table.contains("offline")); + assert!(table.contains("code:rust")); + assert!(table.contains("review")); + } + + #[test] + fn fleet_status_table_has_header() { + let agents = vec![sample_agent("w1", AgentStatus::Online, 0)]; + let table = format_fleet_status(&agents); + assert!(table.contains("Agent ID")); + assert!(table.contains("Type")); + assert!(table.contains("Status")); + assert!(table.contains("Tasks")); + assert!(table.contains("Capabilities")); + } +} diff --git a/src/integrations/mod.rs b/src/integrations/mod.rs index 607ec12..dbb9804 100644 --- a/src/integrations/mod.rs +++ b/src/integrations/mod.rs @@ -1 +1,2 @@ pub mod forgejo; +pub mod matrix; diff --git a/src/main.rs b/src/main.rs index 3948859..9d7bffc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -106,5 +106,21 @@ async fn main() { .expect("failed to bind"); tracing::info!("listening on {}", listener.local_addr().unwrap()); + + // Start Matrix bot + if !config.matrix.access_token.is_empty() && !config.matrix.room_id.is_empty() { + let matrix_cfg = config.matrix.clone(); + let matrix_store = store.clone(); + let matrix_sm = state_machine.clone(); + tokio::spawn(async move { + match crate::integrations::matrix::start_bot(matrix_cfg, matrix_store, matrix_sm).await { + Ok(_) => tracing::info!("Matrix bot stopped"), + Err(e) => tracing::error!("Matrix bot error: {e}"), + } + }); + } else { + tracing::info!("Matrix bot disabled (no access_token or room_id configured)"); + } + axum::serve(listener, app).await.expect("server error"); }