From f60f028f96f783b30a4e92fdd35dac1104bb1caa Mon Sep 17 00:00:00 2001 From: Zer4tul Date: Mon, 11 May 2026 19:42:03 +0800 Subject: [PATCH] feat: Forgejo integration + Receipt protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tasks completed: - 4.1: Forgejo API client (reqwest, HMAC-SHA256, Issue/Comment/Label/PR) - 4.2: POST /api/v1/webhooks/forgejo (signature verify, event parse) - 4.3: Issue → Task conversion (agent:* → type, priority:* → priority) - 4.4: Task status → Issue label sync (status:todo/doing/done) - 4.5: Receipt → Issue comment (emoji + summary + artifacts) - 4.6: Reconciliation stub - 4.7: Tests for HMAC, Issue→Task conversion - 6.1: POST /api/v1/receipts (validate + transition) - 6.2: PR artifact validation via Forgejo API - 6.3: No-trust check (only Completed after validation) - 6.4: Receipt tests 19/19 tests pass. cargo check clean. --- Cargo.lock | 9 + Cargo.toml | 3 + src/api.rs | 385 ++++++++++++++++++++++++++++++++++-- src/integrations/forgejo.rs | 353 +++++++++++++++++++++++++++++++++ src/integrations/mod.rs | 1 + src/lib.rs | 1 + src/main.rs | 5 +- 7 files changed, 735 insertions(+), 22 deletions(-) create mode 100644 src/integrations/forgejo.rs create mode 100644 src/integrations/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 972028e..f7c1b8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,11 +49,14 @@ dependencies = [ "axum", "chrono", "clap", + "hex", + "hmac", "matrix-sdk", "reqwest", "rusqlite", "serde", "serde_json", + "sha2", "tempfile", "thiserror", "tokio", @@ -1223,6 +1226,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hkdf" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 7771c01..18432fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,9 @@ uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } thiserror = "2" async-trait = "0.1" +hmac = "0.12" +sha2 = "0.10" +hex = "0.4" [dev-dependencies] tempfile = "3" diff --git a/src/api.rs b/src/api.rs index 54dda0c..fbc994a 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,17 +2,43 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; +use axum::body::Bytes; use axum::extract::{Query, State}; -use axum::http::StatusCode; +use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::Json; use chrono::Utc; use serde::{Deserialize, Serialize}; +use crate::config::Config; use crate::core::event_store::EventStore; -use crate::core::models::{Agent, AgentStatus, AgentType, TaskStatus}; +use crate::core::models::{Agent, AgentStatus, AgentType, Receipt, ReceiptStatus, Task, TaskStatus}; +use crate::core::state_machine::StateMachine; +use crate::integrations::forgejo::{ + format_receipt_comment, issue_event_to_task, parse_issue_event, status_labels_for_task, + validate_receipt_artifacts, ForgejoApi, ForgejoClient, ForgejoError, UpdateIssueRequest, +}; -pub type AppState = Arc>; +pub type DbState = Arc>; + +#[derive(Clone)] +pub struct AppState { + pub store: DbState, + pub config: Config, + pub forgejo: Arc, +} + +impl AppState { + pub fn new(config: Config, store: DbState) -> Self { + let forgejo = Arc::new(ForgejoClient::new(&config.forgejo)); + Self { store, config, forgejo } + } + + #[cfg(test)] + pub fn with_forgejo(config: Config, store: DbState, forgejo: Arc) -> Self { + Self { store, config, forgejo } + } +} #[derive(Debug, thiserror::Error)] pub enum ApiError { @@ -24,15 +50,24 @@ pub enum ApiError { Poisoned(String), #[error("not found: {0}")] NotFound(String), + #[error("bad request: {0}")] + BadRequest(String), + #[error("unauthorized: {0}")] + Unauthorized(String), + #[error("forgejo error: {0}")] + Forgejo(#[from] ForgejoError), } impl IntoResponse for ApiError { fn into_response(self) -> Response { let status = match self { ApiError::NotFound(_) => StatusCode::NOT_FOUND, - ApiError::Database(_) | ApiError::Join(_) | ApiError::Poisoned(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + ApiError::BadRequest(_) => StatusCode::BAD_REQUEST, + ApiError::Unauthorized(_) => StatusCode::UNAUTHORIZED, + ApiError::Database(_) + | ApiError::Join(_) + | ApiError::Poisoned(_) + | ApiError::Forgejo(_) => StatusCode::INTERNAL_SERVER_ERROR, }; (status, Json(ErrorResponse { error: self.to_string() })).into_response() } @@ -90,6 +125,18 @@ pub struct ListAgentsQuery { pub status: Option, } +#[derive(Debug, Serialize)] +pub struct ReceiptResponse { + pub task_id: String, + pub status: TaskStatus, +} + +#[derive(Debug, Serialize)] +pub struct WebhookResponse { + pub accepted: bool, + pub task_id: Option, +} + pub async fn register_agent( State(state): State, Json(req): Json, @@ -108,7 +155,7 @@ pub async fn register_agent( }; let registry_token = format!("registry_{}", uuid::Uuid::new_v4().simple()); - let store = state.clone(); + let store = state.store.clone(); tokio::task::spawn_blocking(move || -> Result, ApiError> { let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; @@ -126,7 +173,7 @@ pub async fn heartbeat( Json(req): Json, ) -> Result, ApiError> { let agent_id = req.agent_id; - let store = state.clone(); + let store = state.store.clone(); tokio::task::spawn_blocking(move || -> Result, ApiError> { let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; @@ -148,7 +195,7 @@ pub async fn deregister( Json(req): Json, ) -> Result, ApiError> { let agent_id = req.agent_id; - let store = state.clone(); + let store = state.store.clone(); tokio::task::spawn_blocking(move || -> Result, ApiError> { let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; @@ -169,7 +216,7 @@ pub async fn list_agents( State(state): State, Query(query): Query, ) -> Result>, ApiError> { - let store = state.clone(); + let store = state.store.clone(); let status = query.status.and_then(|s| match s.as_str() { "online" => Some(AgentStatus::Online), "offline" => Some(AgentStatus::Offline), @@ -185,22 +232,112 @@ pub async fn list_agents( .await? } -pub async fn submit_receipt() -> &'static str { - "TODO" +pub async fn submit_receipt( + State(state): State, + Json(receipt): Json, +) -> Result, ApiError> { + validate_receipt_artifacts(state.forgejo.as_ref(), &receipt).await?; + + let task_id = receipt.task_id.clone(); + let store = state.store.clone(); + let sm = StateMachine::new(store.clone()); + + let task = tokio::task::spawn_blocking(move || -> Result, ApiError> { + let store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; + Ok(store.read_task(&task_id)?) + }) + .await?? + .ok_or_else(|| ApiError::NotFound(format!("task {}", receipt.task_id)))?; + + let (repo, issue_number) = parse_task_source(&task.source) + .ok_or_else(|| ApiError::BadRequest(format!("invalid task source: {}", task.source)))?; + + let new_status = match receipt.status { + ReceiptStatus::Completed => TaskStatus::Completed, + ReceiptStatus::Failed => TaskStatus::Failed, + ReceiptStatus::Partial => TaskStatus::Failed, + }; + + let updated_task = sm + .transition(&receipt.task_id, new_status.clone(), Some(&receipt.agent_id), "receipt validated") + .await + .map_err(|e| ApiError::BadRequest(e.to_string()))?; + + let labels = status_labels_for_task(&new_status, &updated_task.labels); + state + .forgejo + .update_issue( + &repo, + issue_number, + UpdateIssueRequest { + assignees: Some(vec![receipt.agent_id.clone()]), + labels: Some(labels), + }, + ) + .await?; + + state + .forgejo + .create_issue_comment(&repo, issue_number, &format_receipt_comment(&receipt)) + .await?; + + Ok(Json(ReceiptResponse { + task_id: receipt.task_id, + status: new_status, + })) } -pub async fn forgejo_webhook() -> &'static str { - "TODO" +pub async fn forgejo_webhook( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> Result, ApiError> { + let signature = headers + .get("x-gitea-signature") + .or_else(|| headers.get("x-forgejo-signature")) + .and_then(|v| v.to_str().ok()) + .ok_or_else(|| ApiError::Unauthorized("missing webhook signature".into()))?; + + let client = ForgejoClient::new(&state.config.forgejo); + client + .verify_webhook_signature(&body, signature) + .map_err(|_| ApiError::Unauthorized("invalid webhook signature".into()))?; + + let event = parse_issue_event(&body)?; + let task = issue_event_to_task( + &event, + state.config.orchestrator.default_max_retries, + state.config.orchestrator.task_timeout_secs, + ); + + let Some(task) = task else { + return Ok(Json(WebhookResponse { + accepted: true, + task_id: None, + })); + }; + + let task_id = task.task_id.clone(); + let store = state.store.clone(); + let sm = StateMachine::new(store); + sm.create_task(&task) + .await + .map_err(|e| ApiError::BadRequest(e.to_string()))?; + + Ok(Json(WebhookResponse { + accepted: true, + task_id: Some(task_id), + })) } pub struct HeartbeatChecker { - store: AppState, + store: DbState, interval: Duration, timeout_seconds: i64, } impl HeartbeatChecker { - pub fn new(store: AppState, interval: Duration, timeout_seconds: i64) -> Self { + pub fn new(store: DbState, interval: Duration, timeout_seconds: i64) -> Self { Self { store, interval, @@ -235,18 +372,72 @@ impl HeartbeatChecker { } } +fn parse_task_source(source: &str) -> Option<(String, u64)> { + let raw = source.strip_prefix("forgejo:")?; + let (repo, issue) = raw.rsplit_once('#')?; + let issue_number = issue.parse().ok()?; + Some((repo.to_string(), issue_number)) +} + #[cfg(test)] mod tests { use super::*; use axum::extract::{Query, State}; + use axum::http::HeaderValue; use std::sync::{Arc, Mutex}; use tempfile::TempDir; + use crate::core::models::{Artifact, ArtifactType, Priority}; + use crate::integrations::forgejo::{ForgejoIssue, ForgejoIssueEvent, ForgejoLabel, ForgejoRepo}; + + #[derive(Default)] + struct FakeForgejo { + pub existing_pr_urls: Mutex>, + pub comments: Mutex>, + pub updates: Mutex>, + } + + #[async_trait::async_trait] + impl ForgejoApi for FakeForgejo { + async fn issue_exists(&self, _repo: &str, _issue_number: u64) -> Result { + Ok(true) + } + + async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError> { + self.comments.lock().unwrap().push((repo.to_string(), issue_number, body.to_string())); + Ok(()) + } + + async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError> { + self.updates.lock().unwrap().push((repo.to_string(), issue_number, req)); + Ok(()) + } + + async fn pr_exists_by_url(&self, pr_url: &str) -> Result { + Ok(self.existing_pr_urls.lock().unwrap().iter().any(|u| u == pr_url)) + } + + async fn reconcile(&self) -> Result<(), ForgejoError> { + Ok(()) + } + } + + fn test_state() -> (TempDir, AppState, Arc) { + let dir = TempDir::new().unwrap(); + let db = dir.path().join("test.db"); + let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap())); + let config = Config::default(); + let fake = Arc::new(FakeForgejo::default()); + let state = AppState::with_forgejo(config, store, fake.clone()); + (dir, state, fake) + } + fn test_store() -> (TempDir, AppState) { let dir = TempDir::new().unwrap(); let db = dir.path().join("test.db"); - let store = EventStore::open(&db).unwrap(); - (dir, Arc::new(Mutex::new(store))) + let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap())); + let config = Config::default(); + (dir, AppState::new(config, store)) } fn sample_register_request(agent_id: &str) -> RegisterAgentRequest { @@ -260,6 +451,56 @@ mod tests { } } + fn sample_issue_event_json() -> Vec { + serde_json::to_vec(&ForgejoIssueEvent { + action: "opened".into(), + issue: ForgejoIssue { + number: 42, + title: "Implement thing".into(), + body: Some("Need agent to do it".into()), + html_url: "https://git.example/repo/issues/42".into(), + labels: vec![ + ForgejoLabel { name: "agent:code".into() }, + ForgejoLabel { name: "priority:high".into() }, + ], + assignees: vec![], + }, + repository: ForgejoRepo { + name: "repo".into(), + full_name: "org/repo".into(), + }, + }) + .unwrap() + } + + fn webhook_signature(secret: &str, body: &[u8]) -> String { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + format!("sha256={}", hex::encode(mac.finalize().into_bytes())) + } + + fn sample_task(task_id: &str) -> Task { + Task { + task_id: task_id.to_string(), + source: "forgejo:org/repo#42".into(), + task_type: "code".into(), + priority: Priority::High, + status: TaskStatus::Running, + assigned_agent_id: Some("worker-01".into()), + requirements: "do something".into(), + labels: vec!["agent:code".into(), "priority:high".into(), "status:doing".into()], + created_at: Utc::now(), + assigned_at: Some(Utc::now()), + started_at: Some(Utc::now()), + completed_at: None, + retry_count: 0, + max_retries: 2, + timeout_seconds: 1800, + } + } + #[tokio::test] async fn register_and_list_agents() { let (_dir, state) = test_store(); @@ -369,12 +610,13 @@ mod tests { .unwrap(); { - let mut store = state.lock().unwrap(); - store.force_agent_last_heartbeat("worker-01", Utc::now() - chrono::Duration::seconds(500)) + let mut store = state.store.lock().unwrap(); + store + .force_agent_last_heartbeat("worker-01", Utc::now() - chrono::Duration::seconds(500)) .unwrap(); } - let checker = HeartbeatChecker::new(state.clone(), Duration::from_secs(60), 180); + let checker = HeartbeatChecker::new(state.store.clone(), Duration::from_secs(60), 180); let affected = checker.check_once().await.unwrap(); assert_eq!(affected, 0); @@ -391,4 +633,105 @@ mod tests { assert_eq!(listed.0.len(), 1); assert_eq!(listed.0[0].agent_id, "worker-01"); } + + #[tokio::test] + async fn webhook_creates_task_from_issue() { + let (_dir, mut state, _fake) = test_state(); + state.config.forgejo.webhook_secret = "top-secret".into(); + + let body = sample_issue_event_json(); + let mut headers = HeaderMap::new(); + headers.insert( + "x-gitea-signature", + HeaderValue::from_str(&webhook_signature("top-secret", &body)).unwrap(), + ); + + let res = forgejo_webhook(State(state.clone()), headers, Bytes::from(body)) + .await + .unwrap(); + assert_eq!(res.0.accepted, true); + assert_eq!(res.0.task_id.as_deref(), Some("org/repo#42")); + + let task = { + let store = state.store.lock().unwrap(); + store.read_task("org/repo#42").unwrap().unwrap() + }; + assert_eq!(task.task_type, "code"); + assert_eq!(task.priority, Priority::High); + assert_eq!(task.status, TaskStatus::Created); + } + + #[tokio::test] + async fn receipt_submission_validates_pr_and_completes_task() { + let (_dir, state, fake) = test_state(); + fake.existing_pr_urls + .lock() + .unwrap() + .push("https://git.example/org/repo/pulls/15".into()); + + { + let store = state.store.lock().unwrap(); + store.insert_task(&sample_task("org/repo#42")).unwrap(); + } + + let receipt = Receipt { + task_id: "org/repo#42".into(), + agent_id: "worker-01".into(), + status: ReceiptStatus::Completed, + duration_seconds: 12, + summary: "Implemented thing".into(), + artifacts: vec![Artifact { + artifact_type: ArtifactType::Pr, + url: Some("https://git.example/org/repo/pulls/15".into()), + path: None, + description: Some("PR #15".into()), + }], + error: None, + }; + + let res = submit_receipt(State(state.clone()), Json(receipt)).await.unwrap(); + assert_eq!(res.0.status, TaskStatus::Completed); + + let task = { + let store = state.store.lock().unwrap(); + store.read_task("org/repo#42").unwrap().unwrap() + }; + assert_eq!(task.status, TaskStatus::Completed); + + assert_eq!(fake.comments.lock().unwrap().len(), 1); + assert_eq!(fake.updates.lock().unwrap().len(), 1); + } + + #[tokio::test] + async fn receipt_submission_rejects_missing_pr() { + let (_dir, state, _fake) = test_state(); + { + let store = state.store.lock().unwrap(); + store.insert_task(&sample_task("org/repo#42")).unwrap(); + } + + let receipt = Receipt { + task_id: "org/repo#42".into(), + agent_id: "worker-01".into(), + status: ReceiptStatus::Completed, + duration_seconds: 12, + summary: "Implemented thing".into(), + artifacts: vec![Artifact { + artifact_type: ArtifactType::Pr, + url: Some("https://git.example/org/repo/pulls/404".into()), + path: None, + description: Some("PR #404".into()), + }], + error: None, + }; + + let err = submit_receipt(State(state.clone()), Json(receipt)).await.unwrap_err(); + assert!(matches!(err, ApiError::Forgejo(_))); + + let task = { + let store = state.store.lock().unwrap(); + store.read_task("org/repo#42").unwrap().unwrap() + }; + assert_eq!(task.status, TaskStatus::Running); + } } diff --git a/src/integrations/forgejo.rs b/src/integrations/forgejo.rs new file mode 100644 index 0000000..a459705 --- /dev/null +++ b/src/integrations/forgejo.rs @@ -0,0 +1,353 @@ +use async_trait::async_trait; +use hmac::{Hmac, Mac}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; + +use crate::config::ForgejoConfig; +use crate::core::models::{Artifact, Priority, Receipt, ReceiptStatus, Task, TaskStatus}; + +pub type HmacSha256 = Hmac; + +#[derive(Debug, thiserror::Error)] +pub enum ForgejoError { + #[error("http error: {0}")] + Http(#[from] reqwest::Error), + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + #[error("invalid signature")] + InvalidSignature, + #[error("unsupported event")] + UnsupportedEvent, + #[error("artifact validation failed: {0}")] + Validation(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoLabel { + pub name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoRepo { + pub name: String, + pub full_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoUser { + pub login: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoIssue { + pub number: u64, + pub title: String, + pub body: Option, + pub html_url: String, + #[serde(default)] + pub labels: Vec, + #[serde(default)] + pub assignees: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoIssueEvent { + pub action: String, + pub issue: ForgejoIssue, + pub repository: ForgejoRepo, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForgejoPullRequest { + pub number: u64, + pub html_url: String, + pub title: String, + pub body: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateCommentRequest { + pub body: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateIssueRequest { + pub assignees: Option>, + pub labels: Option>, +} + +#[async_trait] +pub trait ForgejoApi: Send + Sync { + async fn issue_exists(&self, repo: &str, issue_number: u64) -> Result; + async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError>; + async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError>; + async fn pr_exists_by_url(&self, pr_url: &str) -> Result; + async fn reconcile(&self) -> Result<(), ForgejoError>; +} + +#[derive(Clone)] +pub struct ForgejoClient { + pub base_url: String, + pub token: String, + pub webhook_secret: String, + client: Client, +} + +impl ForgejoClient { + pub fn new(config: &ForgejoConfig) -> Self { + Self { + base_url: config.url.trim_end_matches('/').to_string(), + token: config.token.clone(), + webhook_secret: config.webhook_secret.clone(), + client: Client::new(), + } + } + + pub fn verify_webhook_signature(&self, body: &[u8], signature: &str) -> Result<(), ForgejoError> { + verify_webhook_signature(&self.webhook_secret, body, signature) + } +} + +#[async_trait] +impl ForgejoApi for ForgejoClient { + async fn issue_exists(&self, repo: &str, issue_number: u64) -> Result { + let url = format!("{}/api/v1/repos/{}/issues/{}", self.base_url, repo, issue_number); + let res = self + .client + .get(url) + .bearer_auth(&self.token) + .send() + .await?; + Ok(res.status().is_success()) + } + + async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError> { + let url = format!("{}/api/v1/repos/{}/issues/{}/comments", self.base_url, repo, issue_number); + self.client + .post(url) + .bearer_auth(&self.token) + .json(&CreateCommentRequest { body: body.to_string() }) + .send() + .await? + .error_for_status()?; + Ok(()) + } + + async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError> { + let url = format!("{}/api/v1/repos/{}/issues/{}", self.base_url, repo, issue_number); + self.client + .patch(url) + .bearer_auth(&self.token) + .json(&req) + .send() + .await? + .error_for_status()?; + Ok(()) + } + + async fn pr_exists_by_url(&self, pr_url: &str) -> Result { + let res = self.client.get(pr_url).bearer_auth(&self.token).send().await?; + Ok(res.status().is_success()) + } + + async fn reconcile(&self) -> Result<(), ForgejoError> { + Ok(()) + } +} + +pub fn verify_webhook_signature(secret: &str, body: &[u8], signature: &str) -> Result<(), ForgejoError> { + let provided = signature.trim(); + let provided = provided.strip_prefix("sha256=").unwrap_or(provided); + + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()) + .map_err(|_| ForgejoError::InvalidSignature)?; + mac.update(body); + let expected = hex::encode(mac.finalize().into_bytes()); + + if expected == provided { + Ok(()) + } else { + Err(ForgejoError::InvalidSignature) + } +} + +pub fn parse_issue_event(body: &[u8]) -> Result { + Ok(serde_json::from_slice(body)?) +} + +pub fn issue_event_to_task(event: &ForgejoIssueEvent, default_max_retries: u32, default_timeout_seconds: u64) -> Option { + let labels: Vec = event.issue.labels.iter().map(|l| l.name.clone()).collect(); + let task_type = infer_task_type(&labels)?; + let priority = infer_priority(&labels); + + Some(Task { + task_id: format!("{}#{}", event.repository.full_name, event.issue.number), + source: format!("forgejo:{}#{}", event.repository.full_name, event.issue.number), + task_type, + priority, + status: TaskStatus::Created, + assigned_agent_id: None, + requirements: event.issue.body.clone().unwrap_or_default(), + labels, + created_at: chrono::Utc::now(), + assigned_at: None, + started_at: None, + completed_at: None, + retry_count: 0, + max_retries: default_max_retries, + timeout_seconds: default_timeout_seconds, + }) +} + +pub fn infer_task_type(labels: &[String]) -> Option { + for label in labels { + if let Some(value) = label.strip_prefix("agent:") { + return Some(value.to_string()); + } + } + None +} + +pub fn infer_priority(labels: &[String]) -> Priority { + if labels.iter().any(|l| l == "priority:urgent") { + Priority::Urgent + } else if labels.iter().any(|l| l == "priority:high") { + Priority::High + } else if labels.iter().any(|l| l == "priority:low") { + Priority::Low + } else { + Priority::Normal + } +} + +pub fn status_labels_for_task(status: &TaskStatus, existing_labels: &[String]) -> Vec { + let mut labels: Vec = existing_labels + .iter() + .filter(|label| !label.starts_with("status:")) + .cloned() + .collect(); + + let status_label = match status { + TaskStatus::Created => "status:todo", + TaskStatus::Assigned | TaskStatus::Running => "status:doing", + TaskStatus::Completed => "status:done", + TaskStatus::Failed | TaskStatus::AgentLost | TaskStatus::Cancelled => "status:todo", + }; + labels.push(status_label.to_string()); + labels +} + +pub fn format_receipt_comment(receipt: &Receipt) -> String { + let emoji = match receipt.status { + ReceiptStatus::Completed => "✅", + ReceiptStatus::Failed => "❌", + ReceiptStatus::Partial => "🟡", + }; + + let mut body = format!( + "{} **Receipt**\n\n- Task: `{}`\n- Agent: `{}`\n- Status: `{}`\n- Duration: {}s\n- Summary: {}\n", + emoji, + receipt.task_id, + receipt.agent_id, + match receipt.status { + ReceiptStatus::Completed => "completed", + ReceiptStatus::Failed => "failed", + ReceiptStatus::Partial => "partial", + }, + receipt.duration_seconds, + receipt.summary + ); + + if !receipt.artifacts.is_empty() { + body.push_str("- Artifacts:\n"); + for artifact in &receipt.artifacts { + let target = artifact + .url + .as_ref() + .or(artifact.path.as_ref()) + .cloned() + .unwrap_or_else(|| "".into()); + body.push_str(&format!(" - {:?}: {}\n", artifact.artifact_type, target)); + } + } + + if let Some(error) = &receipt.error { + body.push_str(&format!("- Error: {}\n", error)); + } + + body +} + +pub async fn validate_receipt_artifacts( + client: &dyn ForgejoApi, + receipt: &Receipt, +) -> Result<(), ForgejoError> { + for artifact in &receipt.artifacts { + validate_artifact(client, artifact).await?; + } + Ok(()) +} + +async fn validate_artifact(client: &dyn ForgejoApi, artifact: &Artifact) -> Result<(), ForgejoError> { + match artifact.artifact_type { + crate::core::models::ArtifactType::Pr => { + let url = artifact + .url + .as_deref() + .ok_or_else(|| ForgejoError::Validation("missing PR url".into()))?; + if client.pr_exists_by_url(url).await? { + Ok(()) + } else { + Err(ForgejoError::Validation(format!("PR not found: {url}"))) + } + } + _ => Ok(()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn verifies_valid_hmac_signature() { + let body = br#"{"hello":"world"}"#; + let secret = "top-secret"; + + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + let sig = format!("sha256={}", hex::encode(mac.finalize().into_bytes())); + + verify_webhook_signature(secret, body, &sig).unwrap(); + } + + #[test] + fn converts_issue_event_to_task() { + let event = ForgejoIssueEvent { + action: "opened".into(), + issue: ForgejoIssue { + number: 42, + title: "Implement thing".into(), + body: Some("Need agent to do it".into()), + html_url: "https://git.example/repo/issues/42".into(), + labels: vec![ + ForgejoLabel { name: "agent:code".into() }, + ForgejoLabel { name: "priority:high".into() }, + ], + assignees: vec![], + }, + repository: ForgejoRepo { + name: "repo".into(), + full_name: "org/repo".into(), + }, + }; + + let task = issue_event_to_task(&event, 2, 1800).unwrap(); + assert_eq!(task.task_id, "org/repo#42"); + assert_eq!(task.source, "forgejo:org/repo#42"); + assert_eq!(task.task_type, "code"); + assert_eq!(task.priority, Priority::High); + assert_eq!(task.status, TaskStatus::Created); + } +} diff --git a/src/integrations/mod.rs b/src/integrations/mod.rs new file mode 100644 index 0000000..607ec12 --- /dev/null +++ b/src/integrations/mod.rs @@ -0,0 +1 @@ +pub mod forgejo; diff --git a/src/lib.rs b/src/lib.rs index 220e131..609e245 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ pub mod api; pub mod core; pub mod config; +pub mod integrations; diff --git a/src/main.rs b/src/main.rs index c2879ee..4a8f6b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod api; mod config; mod core; +mod integrations; use clap::Parser; @@ -81,6 +82,8 @@ async fn main() { )); tokio::spawn(async move { heartbeat_checker.run().await }); + let app_state = api::AppState::new(config.clone(), store.clone()); + let app = axum::Router::new() .route("/healthz", axum::routing::get(|| async { "ok" })) .route("/api/v1/agents/register", axum::routing::post(api::register_agent)) @@ -92,7 +95,7 @@ async fn main() { "/api/v1/webhooks/forgejo", axum::routing::post(api::forgejo_webhook), ) - .with_state(store.clone()); + .with_state(app_state); let listener = tokio::net::TcpListener::bind(format!( "{}:{}",