feat: Agent Adapter Interface (Task 7)
- AgentAdapter trait: register, heartbeat, execute, submit_receipt, deregister - AdapterRunner: lifecycle management (start with health check, heartbeat loop, graceful stop) - AdapterInstanceConfig: per-adapter config (type, work_dir, model, capabilities, env, connection) - Config integration: adapters field in Config + config.example.toml - 3 tests: config extraction, runner lifecycle, fake execute 22/22 tests pass.
This commit is contained in:
parent
f60f028f96
commit
1dacd17231
4 changed files with 315 additions and 1 deletions
307
src/adapters/mod.rs
Normal file
307
src/adapters/mod.rs
Normal file
|
|
@ -0,0 +1,307 @@
|
|||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::api::{DeregisterRequest, HeartbeatRequest, RegisterAgentRequest};
|
||||
use crate::config::Config;
|
||||
use crate::core::models::{Receipt, Task};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum AdapterKind {
|
||||
ClaudeCode,
|
||||
CodexCli,
|
||||
OpenClaw,
|
||||
Acp,
|
||||
Shell,
|
||||
Other(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct AdapterInstanceConfig {
|
||||
pub agent_id: String,
|
||||
pub adapter: AdapterKind,
|
||||
pub work_dir: PathBuf,
|
||||
#[serde(default)]
|
||||
pub model: Option<String>,
|
||||
#[serde(default)]
|
||||
pub max_concurrency: u32,
|
||||
#[serde(default)]
|
||||
pub capabilities: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub env: HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub connection: AdapterConnectionConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub struct AdapterConnectionConfig {
|
||||
#[serde(default)]
|
||||
pub base_url: Option<String>,
|
||||
#[serde(default)]
|
||||
pub access_token: Option<String>,
|
||||
#[serde(default)]
|
||||
pub command: Option<String>,
|
||||
#[serde(default)]
|
||||
pub args: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct AdapterHealth {
|
||||
pub ok: bool,
|
||||
pub detail: String,
|
||||
}
|
||||
|
||||
impl AdapterHealth {
|
||||
pub fn healthy(detail: impl Into<String>) -> Self {
|
||||
Self {
|
||||
ok: true,
|
||||
detail: detail.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unhealthy(detail: impl Into<String>) -> Self {
|
||||
Self {
|
||||
ok: false,
|
||||
detail: detail.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AdapterError {
|
||||
#[error("adapter health check failed: {0}")]
|
||||
HealthCheckFailed(String),
|
||||
#[error("adapter lifecycle error: {0}")]
|
||||
Lifecycle(String),
|
||||
#[error("adapter execution error: {0}")]
|
||||
Execution(String),
|
||||
#[error("adapter join error: {0}")]
|
||||
Join(#[from] tokio::task::JoinError),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AgentAdapter: Send + Sync {
|
||||
async fn health_check(&self) -> Result<AdapterHealth, AdapterError>;
|
||||
async fn register(&self) -> Result<RegisterAgentRequest, AdapterError>;
|
||||
async fn heartbeat(&self) -> Result<HeartbeatRequest, AdapterError>;
|
||||
async fn execute(&self, task: Task) -> Result<Receipt, AdapterError>;
|
||||
async fn submit_receipt(&self, receipt: Receipt) -> Result<(), AdapterError>;
|
||||
async fn deregister(&self) -> Result<DeregisterRequest, AdapterError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
pub struct AdapterConfigFile {
|
||||
#[serde(default)]
|
||||
pub adapters: Vec<AdapterInstanceConfig>,
|
||||
}
|
||||
|
||||
impl AdapterConfigFile {
|
||||
pub fn from_config(config: &Config) -> Self {
|
||||
Self {
|
||||
adapters: config.adapters.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AdapterRunner<A: AgentAdapter> {
|
||||
adapter: Arc<A>,
|
||||
heartbeat_interval: Duration,
|
||||
heartbeat_task: Option<JoinHandle<Result<(), AdapterError>>>,
|
||||
shutdown_tx: Option<watch::Sender<bool>>,
|
||||
}
|
||||
|
||||
impl<A: AgentAdapter + 'static> AdapterRunner<A> {
|
||||
pub fn new(adapter: Arc<A>, heartbeat_interval: Duration) -> Self {
|
||||
Self {
|
||||
adapter,
|
||||
heartbeat_interval,
|
||||
heartbeat_task: None,
|
||||
shutdown_tx: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<(), AdapterError> {
|
||||
let health = self.adapter.health_check().await?;
|
||||
if !health.ok {
|
||||
return Err(AdapterError::HealthCheckFailed(health.detail));
|
||||
}
|
||||
|
||||
self.adapter.register().await?;
|
||||
|
||||
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
|
||||
let adapter = self.adapter.clone();
|
||||
let interval_duration = self.heartbeat_interval;
|
||||
let task = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(interval_duration);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
adapter.heartbeat().await?;
|
||||
}
|
||||
changed = shutdown_rx.changed() => {
|
||||
if changed.is_err() || *shutdown_rx.borrow() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
self.shutdown_tx = Some(shutdown_tx);
|
||||
self.heartbeat_task = Some(task);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> Result<(), AdapterError> {
|
||||
if let Some(tx) = self.shutdown_tx.take() {
|
||||
let _ = tx.send(true);
|
||||
}
|
||||
|
||||
if let Some(task) = self.heartbeat_task.take() {
|
||||
task.await??;
|
||||
}
|
||||
|
||||
self.adapter.deregister().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::Utc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::core::models::{Priority, ReceiptStatus, TaskStatus};
|
||||
|
||||
#[derive(Default)]
|
||||
struct FakeAdapter {
|
||||
register_calls: AtomicUsize,
|
||||
heartbeat_calls: AtomicUsize,
|
||||
deregister_calls: AtomicUsize,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AgentAdapter for FakeAdapter {
|
||||
async fn health_check(&self) -> Result<AdapterHealth, AdapterError> {
|
||||
Ok(AdapterHealth::healthy("ok"))
|
||||
}
|
||||
|
||||
async fn register(&self) -> Result<RegisterAgentRequest, AdapterError> {
|
||||
self.register_calls.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(RegisterAgentRequest {
|
||||
agent_id: "worker-01".into(),
|
||||
agent_type: crate::core::models::AgentType::CodexCli,
|
||||
hostname: "host-01".into(),
|
||||
capabilities: vec!["code:rust".into()],
|
||||
max_concurrency: 1,
|
||||
metadata: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn heartbeat(&self) -> Result<HeartbeatRequest, AdapterError> {
|
||||
self.heartbeat_calls.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(HeartbeatRequest {
|
||||
agent_id: "worker-01".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn execute(&self, task: Task) -> Result<Receipt, AdapterError> {
|
||||
Ok(Receipt {
|
||||
task_id: task.task_id,
|
||||
agent_id: "worker-01".into(),
|
||||
status: ReceiptStatus::Completed,
|
||||
duration_seconds: 1,
|
||||
summary: "done".into(),
|
||||
artifacts: vec![],
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn submit_receipt(&self, _receipt: Receipt) -> Result<(), AdapterError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn deregister(&self) -> Result<DeregisterRequest, AdapterError> {
|
||||
self.deregister_calls.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(DeregisterRequest {
|
||||
agent_id: "worker-01".into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn config_file_extracts_adapters() {
|
||||
let mut config = Config::default();
|
||||
config.adapters = vec![AdapterInstanceConfig {
|
||||
agent_id: "worker-01".into(),
|
||||
adapter: AdapterKind::CodexCli,
|
||||
work_dir: PathBuf::from("/tmp/repo"),
|
||||
model: Some("gpt-5".into()),
|
||||
max_concurrency: 2,
|
||||
capabilities: vec!["code:rust".into()],
|
||||
env: HashMap::from([("RUST_LOG".into(), "info".into())]),
|
||||
connection: AdapterConnectionConfig {
|
||||
command: Some("codex".into()),
|
||||
args: vec!["exec".into(), "--json".into()],
|
||||
..Default::default()
|
||||
},
|
||||
}];
|
||||
|
||||
let file = AdapterConfigFile::from_config(&config);
|
||||
assert_eq!(file.adapters.len(), 1);
|
||||
assert_eq!(file.adapters[0].agent_id, "worker-01");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn runner_registers_heartbeats_and_stops() {
|
||||
let adapter = Arc::new(FakeAdapter::default());
|
||||
let mut runner = AdapterRunner::new(adapter.clone(), Duration::from_millis(10));
|
||||
|
||||
runner.start().await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(35)).await;
|
||||
runner.stop().await.unwrap();
|
||||
|
||||
assert_eq!(adapter.register_calls.load(Ordering::SeqCst), 1);
|
||||
assert!(adapter.heartbeat_calls.load(Ordering::SeqCst) >= 1);
|
||||
assert_eq!(adapter.deregister_calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fake_execute_returns_receipt_shape() {
|
||||
let adapter = FakeAdapter::default();
|
||||
let receipt = adapter
|
||||
.execute(Task {
|
||||
task_id: "task-1".into(),
|
||||
source: "forgejo:org/repo#1".into(),
|
||||
task_type: "code".into(),
|
||||
priority: Priority::Normal,
|
||||
status: TaskStatus::Assigned,
|
||||
assigned_agent_id: Some("worker-01".into()),
|
||||
requirements: "ship it".into(),
|
||||
labels: vec![],
|
||||
created_at: Utc::now(),
|
||||
assigned_at: Some(Utc::now()),
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
retry_count: 0,
|
||||
max_retries: 2,
|
||||
timeout_seconds: 60,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(receipt.task_id, "task-1");
|
||||
assert_eq!(receipt.status, ReceiptStatus::Completed);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +1,15 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::adapters::AdapterInstanceConfig;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
pub server: ServerConfig,
|
||||
pub forgejo: ForgejoConfig,
|
||||
pub matrix: MatrixConfig,
|
||||
pub orchestrator: OrchestratorConfig,
|
||||
#[serde(default)]
|
||||
pub adapters: Vec<AdapterInstanceConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -63,6 +67,7 @@ impl Default for Config {
|
|||
task_timeout_secs: 1800,
|
||||
default_max_retries: 2,
|
||||
},
|
||||
adapters: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
pub mod adapters;
|
||||
pub mod api;
|
||||
pub mod core;
|
||||
pub mod config;
|
||||
pub mod core;
|
||||
pub mod integrations;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
mod adapters;
|
||||
mod api;
|
||||
mod config;
|
||||
mod core;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue