feat: Forgejo integration + Receipt protocol

Tasks completed:
- 4.1: Forgejo API client (reqwest, HMAC-SHA256, Issue/Comment/Label/PR)
- 4.2: POST /api/v1/webhooks/forgejo (signature verify, event parse)
- 4.3: Issue → Task conversion (agent:* → type, priority:* → priority)
- 4.4: Task status → Issue label sync (status:todo/doing/done)
- 4.5: Receipt → Issue comment (emoji + summary + artifacts)
- 4.6: Reconciliation stub
- 4.7: Tests for HMAC, Issue→Task conversion
- 6.1: POST /api/v1/receipts (validate + transition)
- 6.2: PR artifact validation via Forgejo API
- 6.3: No-trust check (only Completed after validation)
- 6.4: Receipt tests

19/19 tests pass. cargo check clean.
This commit is contained in:
Zer4tul 2026-05-11 19:42:03 +08:00
parent b75546bda6
commit f60f028f96
7 changed files with 735 additions and 22 deletions

9
Cargo.lock generated
View file

@ -49,11 +49,14 @@ dependencies = [
"axum", "axum",
"chrono", "chrono",
"clap", "clap",
"hex",
"hmac",
"matrix-sdk", "matrix-sdk",
"reqwest", "reqwest",
"rusqlite", "rusqlite",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
@ -1223,6 +1226,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "hkdf" name = "hkdf"
version = "0.12.4" version = "0.12.4"

View file

@ -40,6 +40,9 @@ uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
thiserror = "2" thiserror = "2"
async-trait = "0.1" async-trait = "0.1"
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"

View file

@ -2,17 +2,43 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use axum::body::Bytes;
use axum::extract::{Query, State}; use axum::extract::{Query, State};
use axum::http::StatusCode; use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use chrono::Utc; use chrono::Utc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::config::Config;
use crate::core::event_store::EventStore; use crate::core::event_store::EventStore;
use crate::core::models::{Agent, AgentStatus, AgentType, TaskStatus}; use crate::core::models::{Agent, AgentStatus, AgentType, Receipt, ReceiptStatus, Task, TaskStatus};
use crate::core::state_machine::StateMachine;
use crate::integrations::forgejo::{
format_receipt_comment, issue_event_to_task, parse_issue_event, status_labels_for_task,
validate_receipt_artifacts, ForgejoApi, ForgejoClient, ForgejoError, UpdateIssueRequest,
};
pub type AppState = Arc<Mutex<EventStore>>; pub type DbState = Arc<Mutex<EventStore>>;
#[derive(Clone)]
pub struct AppState {
pub store: DbState,
pub config: Config,
pub forgejo: Arc<dyn ForgejoApi>,
}
impl AppState {
pub fn new(config: Config, store: DbState) -> Self {
let forgejo = Arc::new(ForgejoClient::new(&config.forgejo));
Self { store, config, forgejo }
}
#[cfg(test)]
pub fn with_forgejo(config: Config, store: DbState, forgejo: Arc<dyn ForgejoApi>) -> Self {
Self { store, config, forgejo }
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ApiError { pub enum ApiError {
@ -24,15 +50,24 @@ pub enum ApiError {
Poisoned(String), Poisoned(String),
#[error("not found: {0}")] #[error("not found: {0}")]
NotFound(String), NotFound(String),
#[error("bad request: {0}")]
BadRequest(String),
#[error("unauthorized: {0}")]
Unauthorized(String),
#[error("forgejo error: {0}")]
Forgejo(#[from] ForgejoError),
} }
impl IntoResponse for ApiError { impl IntoResponse for ApiError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let status = match self { let status = match self {
ApiError::NotFound(_) => StatusCode::NOT_FOUND, ApiError::NotFound(_) => StatusCode::NOT_FOUND,
ApiError::Database(_) | ApiError::Join(_) | ApiError::Poisoned(_) => { ApiError::BadRequest(_) => StatusCode::BAD_REQUEST,
StatusCode::INTERNAL_SERVER_ERROR ApiError::Unauthorized(_) => StatusCode::UNAUTHORIZED,
} ApiError::Database(_)
| ApiError::Join(_)
| ApiError::Poisoned(_)
| ApiError::Forgejo(_) => StatusCode::INTERNAL_SERVER_ERROR,
}; };
(status, Json(ErrorResponse { error: self.to_string() })).into_response() (status, Json(ErrorResponse { error: self.to_string() })).into_response()
} }
@ -90,6 +125,18 @@ pub struct ListAgentsQuery {
pub status: Option<String>, pub status: Option<String>,
} }
#[derive(Debug, Serialize)]
pub struct ReceiptResponse {
pub task_id: String,
pub status: TaskStatus,
}
#[derive(Debug, Serialize)]
pub struct WebhookResponse {
pub accepted: bool,
pub task_id: Option<String>,
}
pub async fn register_agent( pub async fn register_agent(
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<RegisterAgentRequest>, Json(req): Json<RegisterAgentRequest>,
@ -108,7 +155,7 @@ pub async fn register_agent(
}; };
let registry_token = format!("registry_{}", uuid::Uuid::new_v4().simple()); let registry_token = format!("registry_{}", uuid::Uuid::new_v4().simple());
let store = state.clone(); let store = state.store.clone();
tokio::task::spawn_blocking(move || -> Result<Json<RegisterAgentResponse>, ApiError> { tokio::task::spawn_blocking(move || -> Result<Json<RegisterAgentResponse>, ApiError> {
let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?;
@ -126,7 +173,7 @@ pub async fn heartbeat(
Json(req): Json<HeartbeatRequest>, Json(req): Json<HeartbeatRequest>,
) -> Result<Json<HeartbeatResponse>, ApiError> { ) -> Result<Json<HeartbeatResponse>, ApiError> {
let agent_id = req.agent_id; let agent_id = req.agent_id;
let store = state.clone(); let store = state.store.clone();
tokio::task::spawn_blocking(move || -> Result<Json<HeartbeatResponse>, ApiError> { tokio::task::spawn_blocking(move || -> Result<Json<HeartbeatResponse>, ApiError> {
let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?;
@ -148,7 +195,7 @@ pub async fn deregister(
Json(req): Json<DeregisterRequest>, Json(req): Json<DeregisterRequest>,
) -> Result<Json<DeregisterResponse>, ApiError> { ) -> Result<Json<DeregisterResponse>, ApiError> {
let agent_id = req.agent_id; let agent_id = req.agent_id;
let store = state.clone(); let store = state.store.clone();
tokio::task::spawn_blocking(move || -> Result<Json<DeregisterResponse>, ApiError> { tokio::task::spawn_blocking(move || -> Result<Json<DeregisterResponse>, ApiError> {
let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?; let mut store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?;
@ -169,7 +216,7 @@ pub async fn list_agents(
State(state): State<AppState>, State(state): State<AppState>,
Query(query): Query<ListAgentsQuery>, Query(query): Query<ListAgentsQuery>,
) -> Result<Json<Vec<Agent>>, ApiError> { ) -> Result<Json<Vec<Agent>>, ApiError> {
let store = state.clone(); let store = state.store.clone();
let status = query.status.and_then(|s| match s.as_str() { let status = query.status.and_then(|s| match s.as_str() {
"online" => Some(AgentStatus::Online), "online" => Some(AgentStatus::Online),
"offline" => Some(AgentStatus::Offline), "offline" => Some(AgentStatus::Offline),
@ -185,22 +232,112 @@ pub async fn list_agents(
.await? .await?
} }
pub async fn submit_receipt() -> &'static str { pub async fn submit_receipt(
"TODO" State(state): State<AppState>,
Json(receipt): Json<Receipt>,
) -> Result<Json<ReceiptResponse>, ApiError> {
validate_receipt_artifacts(state.forgejo.as_ref(), &receipt).await?;
let task_id = receipt.task_id.clone();
let store = state.store.clone();
let sm = StateMachine::new(store.clone());
let task = tokio::task::spawn_blocking(move || -> Result<Option<Task>, ApiError> {
let store = store.lock().map_err(|e| ApiError::Poisoned(e.to_string()))?;
Ok(store.read_task(&task_id)?)
})
.await??
.ok_or_else(|| ApiError::NotFound(format!("task {}", receipt.task_id)))?;
let (repo, issue_number) = parse_task_source(&task.source)
.ok_or_else(|| ApiError::BadRequest(format!("invalid task source: {}", task.source)))?;
let new_status = match receipt.status {
ReceiptStatus::Completed => TaskStatus::Completed,
ReceiptStatus::Failed => TaskStatus::Failed,
ReceiptStatus::Partial => TaskStatus::Failed,
};
let updated_task = sm
.transition(&receipt.task_id, new_status.clone(), Some(&receipt.agent_id), "receipt validated")
.await
.map_err(|e| ApiError::BadRequest(e.to_string()))?;
let labels = status_labels_for_task(&new_status, &updated_task.labels);
state
.forgejo
.update_issue(
&repo,
issue_number,
UpdateIssueRequest {
assignees: Some(vec![receipt.agent_id.clone()]),
labels: Some(labels),
},
)
.await?;
state
.forgejo
.create_issue_comment(&repo, issue_number, &format_receipt_comment(&receipt))
.await?;
Ok(Json(ReceiptResponse {
task_id: receipt.task_id,
status: new_status,
}))
} }
pub async fn forgejo_webhook() -> &'static str { pub async fn forgejo_webhook(
"TODO" State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Result<Json<WebhookResponse>, ApiError> {
let signature = headers
.get("x-gitea-signature")
.or_else(|| headers.get("x-forgejo-signature"))
.and_then(|v| v.to_str().ok())
.ok_or_else(|| ApiError::Unauthorized("missing webhook signature".into()))?;
let client = ForgejoClient::new(&state.config.forgejo);
client
.verify_webhook_signature(&body, signature)
.map_err(|_| ApiError::Unauthorized("invalid webhook signature".into()))?;
let event = parse_issue_event(&body)?;
let task = issue_event_to_task(
&event,
state.config.orchestrator.default_max_retries,
state.config.orchestrator.task_timeout_secs,
);
let Some(task) = task else {
return Ok(Json(WebhookResponse {
accepted: true,
task_id: None,
}));
};
let task_id = task.task_id.clone();
let store = state.store.clone();
let sm = StateMachine::new(store);
sm.create_task(&task)
.await
.map_err(|e| ApiError::BadRequest(e.to_string()))?;
Ok(Json(WebhookResponse {
accepted: true,
task_id: Some(task_id),
}))
} }
pub struct HeartbeatChecker { pub struct HeartbeatChecker {
store: AppState, store: DbState,
interval: Duration, interval: Duration,
timeout_seconds: i64, timeout_seconds: i64,
} }
impl HeartbeatChecker { impl HeartbeatChecker {
pub fn new(store: AppState, interval: Duration, timeout_seconds: i64) -> Self { pub fn new(store: DbState, interval: Duration, timeout_seconds: i64) -> Self {
Self { Self {
store, store,
interval, interval,
@ -235,18 +372,72 @@ impl HeartbeatChecker {
} }
} }
fn parse_task_source(source: &str) -> Option<(String, u64)> {
let raw = source.strip_prefix("forgejo:")?;
let (repo, issue) = raw.rsplit_once('#')?;
let issue_number = issue.parse().ok()?;
Some((repo.to_string(), issue_number))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use axum::extract::{Query, State}; use axum::extract::{Query, State};
use axum::http::HeaderValue;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tempfile::TempDir; use tempfile::TempDir;
use crate::core::models::{Artifact, ArtifactType, Priority};
use crate::integrations::forgejo::{ForgejoIssue, ForgejoIssueEvent, ForgejoLabel, ForgejoRepo};
#[derive(Default)]
struct FakeForgejo {
pub existing_pr_urls: Mutex<Vec<String>>,
pub comments: Mutex<Vec<(String, u64, String)>>,
pub updates: Mutex<Vec<(String, u64, UpdateIssueRequest)>>,
}
#[async_trait::async_trait]
impl ForgejoApi for FakeForgejo {
async fn issue_exists(&self, _repo: &str, _issue_number: u64) -> Result<bool, ForgejoError> {
Ok(true)
}
async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError> {
self.comments.lock().unwrap().push((repo.to_string(), issue_number, body.to_string()));
Ok(())
}
async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError> {
self.updates.lock().unwrap().push((repo.to_string(), issue_number, req));
Ok(())
}
async fn pr_exists_by_url(&self, pr_url: &str) -> Result<bool, ForgejoError> {
Ok(self.existing_pr_urls.lock().unwrap().iter().any(|u| u == pr_url))
}
async fn reconcile(&self) -> Result<(), ForgejoError> {
Ok(())
}
}
fn test_state() -> (TempDir, AppState, Arc<FakeForgejo>) {
let dir = TempDir::new().unwrap();
let db = dir.path().join("test.db");
let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap()));
let config = Config::default();
let fake = Arc::new(FakeForgejo::default());
let state = AppState::with_forgejo(config, store, fake.clone());
(dir, state, fake)
}
fn test_store() -> (TempDir, AppState) { fn test_store() -> (TempDir, AppState) {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();
let db = dir.path().join("test.db"); let db = dir.path().join("test.db");
let store = EventStore::open(&db).unwrap(); let store = Arc::new(Mutex::new(EventStore::open(&db).unwrap()));
(dir, Arc::new(Mutex::new(store))) let config = Config::default();
(dir, AppState::new(config, store))
} }
fn sample_register_request(agent_id: &str) -> RegisterAgentRequest { fn sample_register_request(agent_id: &str) -> RegisterAgentRequest {
@ -260,6 +451,56 @@ mod tests {
} }
} }
fn sample_issue_event_json() -> Vec<u8> {
serde_json::to_vec(&ForgejoIssueEvent {
action: "opened".into(),
issue: ForgejoIssue {
number: 42,
title: "Implement thing".into(),
body: Some("Need agent to do it".into()),
html_url: "https://git.example/repo/issues/42".into(),
labels: vec![
ForgejoLabel { name: "agent:code".into() },
ForgejoLabel { name: "priority:high".into() },
],
assignees: vec![],
},
repository: ForgejoRepo {
name: "repo".into(),
full_name: "org/repo".into(),
},
})
.unwrap()
}
fn webhook_signature(secret: &str, body: &[u8]) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
mac.update(body);
format!("sha256={}", hex::encode(mac.finalize().into_bytes()))
}
fn sample_task(task_id: &str) -> Task {
Task {
task_id: task_id.to_string(),
source: "forgejo:org/repo#42".into(),
task_type: "code".into(),
priority: Priority::High,
status: TaskStatus::Running,
assigned_agent_id: Some("worker-01".into()),
requirements: "do something".into(),
labels: vec!["agent:code".into(), "priority:high".into(), "status:doing".into()],
created_at: Utc::now(),
assigned_at: Some(Utc::now()),
started_at: Some(Utc::now()),
completed_at: None,
retry_count: 0,
max_retries: 2,
timeout_seconds: 1800,
}
}
#[tokio::test] #[tokio::test]
async fn register_and_list_agents() { async fn register_and_list_agents() {
let (_dir, state) = test_store(); let (_dir, state) = test_store();
@ -369,12 +610,13 @@ mod tests {
.unwrap(); .unwrap();
{ {
let mut store = state.lock().unwrap(); let mut store = state.store.lock().unwrap();
store.force_agent_last_heartbeat("worker-01", Utc::now() - chrono::Duration::seconds(500)) store
.force_agent_last_heartbeat("worker-01", Utc::now() - chrono::Duration::seconds(500))
.unwrap(); .unwrap();
} }
let checker = HeartbeatChecker::new(state.clone(), Duration::from_secs(60), 180); let checker = HeartbeatChecker::new(state.store.clone(), Duration::from_secs(60), 180);
let affected = checker.check_once().await.unwrap(); let affected = checker.check_once().await.unwrap();
assert_eq!(affected, 0); assert_eq!(affected, 0);
@ -391,4 +633,105 @@ mod tests {
assert_eq!(listed.0.len(), 1); assert_eq!(listed.0.len(), 1);
assert_eq!(listed.0[0].agent_id, "worker-01"); assert_eq!(listed.0[0].agent_id, "worker-01");
} }
#[tokio::test]
async fn webhook_creates_task_from_issue() {
let (_dir, mut state, _fake) = test_state();
state.config.forgejo.webhook_secret = "top-secret".into();
let body = sample_issue_event_json();
let mut headers = HeaderMap::new();
headers.insert(
"x-gitea-signature",
HeaderValue::from_str(&webhook_signature("top-secret", &body)).unwrap(),
);
let res = forgejo_webhook(State(state.clone()), headers, Bytes::from(body))
.await
.unwrap();
assert_eq!(res.0.accepted, true);
assert_eq!(res.0.task_id.as_deref(), Some("org/repo#42"));
let task = {
let store = state.store.lock().unwrap();
store.read_task("org/repo#42").unwrap().unwrap()
};
assert_eq!(task.task_type, "code");
assert_eq!(task.priority, Priority::High);
assert_eq!(task.status, TaskStatus::Created);
}
#[tokio::test]
async fn receipt_submission_validates_pr_and_completes_task() {
let (_dir, state, fake) = test_state();
fake.existing_pr_urls
.lock()
.unwrap()
.push("https://git.example/org/repo/pulls/15".into());
{
let store = state.store.lock().unwrap();
store.insert_task(&sample_task("org/repo#42")).unwrap();
}
let receipt = Receipt {
task_id: "org/repo#42".into(),
agent_id: "worker-01".into(),
status: ReceiptStatus::Completed,
duration_seconds: 12,
summary: "Implemented thing".into(),
artifacts: vec![Artifact {
artifact_type: ArtifactType::Pr,
url: Some("https://git.example/org/repo/pulls/15".into()),
path: None,
description: Some("PR #15".into()),
}],
error: None,
};
let res = submit_receipt(State(state.clone()), Json(receipt)).await.unwrap();
assert_eq!(res.0.status, TaskStatus::Completed);
let task = {
let store = state.store.lock().unwrap();
store.read_task("org/repo#42").unwrap().unwrap()
};
assert_eq!(task.status, TaskStatus::Completed);
assert_eq!(fake.comments.lock().unwrap().len(), 1);
assert_eq!(fake.updates.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn receipt_submission_rejects_missing_pr() {
let (_dir, state, _fake) = test_state();
{
let store = state.store.lock().unwrap();
store.insert_task(&sample_task("org/repo#42")).unwrap();
}
let receipt = Receipt {
task_id: "org/repo#42".into(),
agent_id: "worker-01".into(),
status: ReceiptStatus::Completed,
duration_seconds: 12,
summary: "Implemented thing".into(),
artifacts: vec![Artifact {
artifact_type: ArtifactType::Pr,
url: Some("https://git.example/org/repo/pulls/404".into()),
path: None,
description: Some("PR #404".into()),
}],
error: None,
};
let err = submit_receipt(State(state.clone()), Json(receipt)).await.unwrap_err();
assert!(matches!(err, ApiError::Forgejo(_)));
let task = {
let store = state.store.lock().unwrap();
store.read_task("org/repo#42").unwrap().unwrap()
};
assert_eq!(task.status, TaskStatus::Running);
}
} }

353
src/integrations/forgejo.rs Normal file
View file

@ -0,0 +1,353 @@
use async_trait::async_trait;
use hmac::{Hmac, Mac};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use crate::config::ForgejoConfig;
use crate::core::models::{Artifact, Priority, Receipt, ReceiptStatus, Task, TaskStatus};
pub type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, thiserror::Error)]
pub enum ForgejoError {
#[error("http error: {0}")]
Http(#[from] reqwest::Error),
#[error("serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("invalid signature")]
InvalidSignature,
#[error("unsupported event")]
UnsupportedEvent,
#[error("artifact validation failed: {0}")]
Validation(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoLabel {
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoRepo {
pub name: String,
pub full_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoUser {
pub login: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoIssue {
pub number: u64,
pub title: String,
pub body: Option<String>,
pub html_url: String,
#[serde(default)]
pub labels: Vec<ForgejoLabel>,
#[serde(default)]
pub assignees: Vec<ForgejoUser>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoIssueEvent {
pub action: String,
pub issue: ForgejoIssue,
pub repository: ForgejoRepo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgejoPullRequest {
pub number: u64,
pub html_url: String,
pub title: String,
pub body: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateCommentRequest {
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateIssueRequest {
pub assignees: Option<Vec<String>>,
pub labels: Option<Vec<String>>,
}
#[async_trait]
pub trait ForgejoApi: Send + Sync {
async fn issue_exists(&self, repo: &str, issue_number: u64) -> Result<bool, ForgejoError>;
async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError>;
async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError>;
async fn pr_exists_by_url(&self, pr_url: &str) -> Result<bool, ForgejoError>;
async fn reconcile(&self) -> Result<(), ForgejoError>;
}
#[derive(Clone)]
pub struct ForgejoClient {
pub base_url: String,
pub token: String,
pub webhook_secret: String,
client: Client,
}
impl ForgejoClient {
pub fn new(config: &ForgejoConfig) -> Self {
Self {
base_url: config.url.trim_end_matches('/').to_string(),
token: config.token.clone(),
webhook_secret: config.webhook_secret.clone(),
client: Client::new(),
}
}
pub fn verify_webhook_signature(&self, body: &[u8], signature: &str) -> Result<(), ForgejoError> {
verify_webhook_signature(&self.webhook_secret, body, signature)
}
}
#[async_trait]
impl ForgejoApi for ForgejoClient {
async fn issue_exists(&self, repo: &str, issue_number: u64) -> Result<bool, ForgejoError> {
let url = format!("{}/api/v1/repos/{}/issues/{}", self.base_url, repo, issue_number);
let res = self
.client
.get(url)
.bearer_auth(&self.token)
.send()
.await?;
Ok(res.status().is_success())
}
async fn create_issue_comment(&self, repo: &str, issue_number: u64, body: &str) -> Result<(), ForgejoError> {
let url = format!("{}/api/v1/repos/{}/issues/{}/comments", self.base_url, repo, issue_number);
self.client
.post(url)
.bearer_auth(&self.token)
.json(&CreateCommentRequest { body: body.to_string() })
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn update_issue(&self, repo: &str, issue_number: u64, req: UpdateIssueRequest) -> Result<(), ForgejoError> {
let url = format!("{}/api/v1/repos/{}/issues/{}", self.base_url, repo, issue_number);
self.client
.patch(url)
.bearer_auth(&self.token)
.json(&req)
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn pr_exists_by_url(&self, pr_url: &str) -> Result<bool, ForgejoError> {
let res = self.client.get(pr_url).bearer_auth(&self.token).send().await?;
Ok(res.status().is_success())
}
async fn reconcile(&self) -> Result<(), ForgejoError> {
Ok(())
}
}
pub fn verify_webhook_signature(secret: &str, body: &[u8], signature: &str) -> Result<(), ForgejoError> {
let provided = signature.trim();
let provided = provided.strip_prefix("sha256=").unwrap_or(provided);
let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
.map_err(|_| ForgejoError::InvalidSignature)?;
mac.update(body);
let expected = hex::encode(mac.finalize().into_bytes());
if expected == provided {
Ok(())
} else {
Err(ForgejoError::InvalidSignature)
}
}
pub fn parse_issue_event(body: &[u8]) -> Result<ForgejoIssueEvent, ForgejoError> {
Ok(serde_json::from_slice(body)?)
}
pub fn issue_event_to_task(event: &ForgejoIssueEvent, default_max_retries: u32, default_timeout_seconds: u64) -> Option<Task> {
let labels: Vec<String> = event.issue.labels.iter().map(|l| l.name.clone()).collect();
let task_type = infer_task_type(&labels)?;
let priority = infer_priority(&labels);
Some(Task {
task_id: format!("{}#{}", event.repository.full_name, event.issue.number),
source: format!("forgejo:{}#{}", event.repository.full_name, event.issue.number),
task_type,
priority,
status: TaskStatus::Created,
assigned_agent_id: None,
requirements: event.issue.body.clone().unwrap_or_default(),
labels,
created_at: chrono::Utc::now(),
assigned_at: None,
started_at: None,
completed_at: None,
retry_count: 0,
max_retries: default_max_retries,
timeout_seconds: default_timeout_seconds,
})
}
pub fn infer_task_type(labels: &[String]) -> Option<String> {
for label in labels {
if let Some(value) = label.strip_prefix("agent:") {
return Some(value.to_string());
}
}
None
}
pub fn infer_priority(labels: &[String]) -> Priority {
if labels.iter().any(|l| l == "priority:urgent") {
Priority::Urgent
} else if labels.iter().any(|l| l == "priority:high") {
Priority::High
} else if labels.iter().any(|l| l == "priority:low") {
Priority::Low
} else {
Priority::Normal
}
}
pub fn status_labels_for_task(status: &TaskStatus, existing_labels: &[String]) -> Vec<String> {
let mut labels: Vec<String> = existing_labels
.iter()
.filter(|label| !label.starts_with("status:"))
.cloned()
.collect();
let status_label = match status {
TaskStatus::Created => "status:todo",
TaskStatus::Assigned | TaskStatus::Running => "status:doing",
TaskStatus::Completed => "status:done",
TaskStatus::Failed | TaskStatus::AgentLost | TaskStatus::Cancelled => "status:todo",
};
labels.push(status_label.to_string());
labels
}
pub fn format_receipt_comment(receipt: &Receipt) -> String {
let emoji = match receipt.status {
ReceiptStatus::Completed => "",
ReceiptStatus::Failed => "",
ReceiptStatus::Partial => "🟡",
};
let mut body = format!(
"{} **Receipt**\n\n- Task: `{}`\n- Agent: `{}`\n- Status: `{}`\n- Duration: {}s\n- Summary: {}\n",
emoji,
receipt.task_id,
receipt.agent_id,
match receipt.status {
ReceiptStatus::Completed => "completed",
ReceiptStatus::Failed => "failed",
ReceiptStatus::Partial => "partial",
},
receipt.duration_seconds,
receipt.summary
);
if !receipt.artifacts.is_empty() {
body.push_str("- Artifacts:\n");
for artifact in &receipt.artifacts {
let target = artifact
.url
.as_ref()
.or(artifact.path.as_ref())
.cloned()
.unwrap_or_else(|| "<unknown>".into());
body.push_str(&format!(" - {:?}: {}\n", artifact.artifact_type, target));
}
}
if let Some(error) = &receipt.error {
body.push_str(&format!("- Error: {}\n", error));
}
body
}
pub async fn validate_receipt_artifacts(
client: &dyn ForgejoApi,
receipt: &Receipt,
) -> Result<(), ForgejoError> {
for artifact in &receipt.artifacts {
validate_artifact(client, artifact).await?;
}
Ok(())
}
async fn validate_artifact(client: &dyn ForgejoApi, artifact: &Artifact) -> Result<(), ForgejoError> {
match artifact.artifact_type {
crate::core::models::ArtifactType::Pr => {
let url = artifact
.url
.as_deref()
.ok_or_else(|| ForgejoError::Validation("missing PR url".into()))?;
if client.pr_exists_by_url(url).await? {
Ok(())
} else {
Err(ForgejoError::Validation(format!("PR not found: {url}")))
}
}
_ => Ok(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn verifies_valid_hmac_signature() {
let body = br#"{"hello":"world"}"#;
let secret = "top-secret";
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap();
mac.update(body);
let sig = format!("sha256={}", hex::encode(mac.finalize().into_bytes()));
verify_webhook_signature(secret, body, &sig).unwrap();
}
#[test]
fn converts_issue_event_to_task() {
let event = ForgejoIssueEvent {
action: "opened".into(),
issue: ForgejoIssue {
number: 42,
title: "Implement thing".into(),
body: Some("Need agent to do it".into()),
html_url: "https://git.example/repo/issues/42".into(),
labels: vec![
ForgejoLabel { name: "agent:code".into() },
ForgejoLabel { name: "priority:high".into() },
],
assignees: vec![],
},
repository: ForgejoRepo {
name: "repo".into(),
full_name: "org/repo".into(),
},
};
let task = issue_event_to_task(&event, 2, 1800).unwrap();
assert_eq!(task.task_id, "org/repo#42");
assert_eq!(task.source, "forgejo:org/repo#42");
assert_eq!(task.task_type, "code");
assert_eq!(task.priority, Priority::High);
assert_eq!(task.status, TaskStatus::Created);
}
}

1
src/integrations/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod forgejo;

View file

@ -1,3 +1,4 @@
pub mod api; pub mod api;
pub mod core; pub mod core;
pub mod config; pub mod config;
pub mod integrations;

View file

@ -1,6 +1,7 @@
mod api; mod api;
mod config; mod config;
mod core; mod core;
mod integrations;
use clap::Parser; use clap::Parser;
@ -81,6 +82,8 @@ async fn main() {
)); ));
tokio::spawn(async move { heartbeat_checker.run().await }); tokio::spawn(async move { heartbeat_checker.run().await });
let app_state = api::AppState::new(config.clone(), store.clone());
let app = axum::Router::new() let app = axum::Router::new()
.route("/healthz", axum::routing::get(|| async { "ok" })) .route("/healthz", axum::routing::get(|| async { "ok" }))
.route("/api/v1/agents/register", axum::routing::post(api::register_agent)) .route("/api/v1/agents/register", axum::routing::post(api::register_agent))
@ -92,7 +95,7 @@ async fn main() {
"/api/v1/webhooks/forgejo", "/api/v1/webhooks/forgejo",
axum::routing::post(api::forgejo_webhook), axum::routing::post(api::forgejo_webhook),
) )
.with_state(store.clone()); .with_state(app_state);
let listener = tokio::net::TcpListener::bind(format!( let listener = tokio::net::TcpListener::bind(format!(
"{}:{}", "{}:{}",