项目架构

oritera/Cairn: A AI general-purpose state-space search engine, validated first on autonomous penetration testing.
总的来看,这个架构是把整个任务的执行过程定了一个起点和终点,看成一个有向无环图
灵感来源于黑板架构,不同的agent的工作都源于这个共享的黑板,它们之间不直接通信,像现在的多智能题的架构的A2A通信,但是可以根据工作中的发现或者进展来改变共享环境的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌──────────────────────────────────┐
Cairn Server
Facts + Intents + Hints
└─────────────────┬────────────────┘

Read / Write API

┌─────────────────┴────────────────┐
Dispatcher
│ 调度任务 / 管理容器 / 写协议 │
└──────────┬───────────────┬───────┘
│ │
┌───────┴────┐ ┌──────┴────────┐
Container │ │ Container
│ (Project A)│ │ (Project B) │
Workers │ │ Workers
└────────────┘ └───────────────┘

claudecode的多智能体架构

第一类是subagent,一个你派去执行专门任务的子agent。
官方文档写的是当一个辅助任务会用搜索结果、日志或文件内容充斥您的主对话,而您不会再次引用这些内容时,请使用一个 subagent,用superpower写代码的时候也会遇见让你选择subagent执行或者单session,在这个终端里面依次执行任务。
Subagent的特点是在自己的 context window 中运行,不和主agent共享上下文,就是不和主agent通信,sub执行任务的结果会返回给主agent,直接注入主agent的上下文

第二类是Agent teams,是直接开几个不同的cc实例,独立工作,每个都在自己的 context window 中,并直接相互通信,生成时,队友加载与常规会话相同的项目 context:CLAUDE.md、MCP servers 和 skills。
img

Cairn Server

1
2
3
4
5
6
7
8
9
10
11
12
cairn/src/cairn/server/
├── __init__.py
├── app.py # FastAPI 应用入口
├── db.py # 数据库层
├── models.py # Pydantic 数据模型
├── services.py # 主要逻辑
└── routers/
├── projects.py # 项目相关 API
├── intents.py # Intent 相关 API
├── hints.py # Hint 相关 API
├── export.py # 导出功能
└── settings.py # 全局设置

路径cairn/src/cairn/server/

Fact

facts是节点,代表一个agent发现的信息或者解题线索,只有描述文本,没有状态标记。只增不改,永久保留,每个 Project 有两个特殊 Fact,在 Project 创建时写入

intent

Intent是边,连接不同的fact,这个边是要被worker认领去执行,执行完成的标志就是产出了结论Fact

1
2
3
4
5
6
7
8
9
10
class Intent(BaseModel):
id: str # Intent 的唯一标识
from_: list[str] = Field(alias="from") # 源 Fact ID 列表(支持多个)
to: str | None = None # 目标 Fact ID(结论)
description: str # 探索方向的描述
creator: str # 创建者
worker: str | None = None # 当前认领者
last_heartbeat_at: str | None = None # 最后心跳时间
created_at: str # 创建时间
concluded_at: str | None = None # 结论时间

intent有一个heartbeat来保证系统的正常运行,一个完整的生命周期是从一个有效的fact开始,创建,定时发送一个心跳,使用一个后台进程,超时未收到心跳就就释放 worker ,但是这个心跳包的发送不是在server层,而是在Dispatcher 实现的。
from 字段是一个列表,支持多个源 Fact,一条边可以同时从多个源节点出发

1
2
3
f002 ──┐
├──(i003)──→ f006
f004 ──┘

具体的实现还有一些问题,看一下源码确认
三个表,fact和intent就是单存他们各自的元数据
intent_sources 表存储 Intent 和 Fact 的 关联关系 (哪个 Intent 从哪些 Fact 出发)

1
2
3
4
5
6
7
CREATE TABLE IF NOT EXISTS intent_sources (
intent_id TEXT NOT NULL,
project_id TEXT NOT NULL,
fact_id TEXT NOT NULL, # 一个 Intent 对应多个 fact_id
PRIMARY KEY (intent_id, project_id, fact_id),
FOREIGN KEY (intent_id, project_id) REFERENCES intents(id, project_id) ON DELETE CASCADE
);

一个 Intent 可以在 intent_sources 表中有多条记录,每条记录代表一个源 Fact,通过 intent_id 关联,实现”一个 Intent 从多个 Fact 出发,关于如何处理fact能联合,是Dispatcher层的通过 reason 任务来决定。

app.py 作为FastAPI 应用入口,lifespan 管理应用生命周期,fastapi的路由在app.py注册,具体逻辑写在routers的同名py文件里面
Project是一个有明确起点和终点的问题实例,包含完整的图数据,model.py的具体代码
model.py定义了图的核心元素还有管理类定义

1
2
3
4
5
class ProjectDetail(BaseModel):
project: ProjectMeta # 项目元数据
facts: list[Fact] # 图的节点集合
intents: list[Intent] # 图的边集合
hints: list[Hint] # 图外输入(不属于图结构)

project

project有三种状态

1
status: Literal["active", "stopped", "completed"]

active和completed好理解,stopped是可恢复到active,代码中, stopped 会清空 两类 worker,
reason的和intent的,其他的整个图都完整保留,恢复的时候重新按正常流程调度,目的是避免脏状态 。如果不停止时清空,某个 worker 已经不在运行了,但 lease 还锁着它,其他的worker也不能进去

1
2
3
4
5
6
7
8
9
# services.py:173-178
if body.status == "stopped":
# 清空所有未结论的 Intent 的 worker
conn.execute(
"UPDATE intents SET worker = NULL WHERE project_id = ? AND concluded_at IS NULL",
(project_id,),
)
# 清空项目的 reason lease
clear_project_reason(conn, project_id)

其他全部数据都保留,所有的fact和intents都在,新的 worker 可以 继续 处理这些未完成的 intents
恢复的时候就直接改status

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class UpdateProjectStatusRequest(BaseModel):
status: Literal["active", "stopped"]

@router.post("/projects/{project_id}/intents/{intent_id}/heartbeat")
def heartbeat(project_id: str, intent_id: str, body: HeartbeatRequest):
with get_conn() as conn:
# 检查项目必须是 active 状态
check_project_active(conn, project_id)

# 检查 intent 是否可以被认领
get_claimable_open_intent_or_404(conn, project_id, intent_id, body.worker)

now = utcnow()
# 重新设置 worker(认领)
conn.execute(
"UPDATE intents SET worker = ?, last_heartbeat_at = ? WHERE id = ? AND project_id = ?",
(body.worker, now, intent_id, project_id),
)

Reason

架构中有两个心跳机制,一个是Intent 心跳,用来认领worker的控制,作用在单个探索任务,任务的结果就是输出新的fact,Reason心跳控制的范围是一个大任务的状态,任务的结果是提出新的intent或者是完成goal,理解为规划层。
Project.reason,单个project同一时刻只能有一个 reason,就是一张图,可以多个 Intent 并发执行
intent的explore是独立的任务,reason是决策不能并行

1
2
3
4
# claim_project_reason  互斥锁
current_worker = row["reason_worker"]
if current_worker is not None and current_worker != body.worker:
raise HTTPException(409, f"Project reason is currently claimed by {current_worker}")

Facts+ Intents+ Hints

1
2
3
4
5
6
7
8
9
10
11
12
class ProjectReason(BaseModel):
worker: str # 当前执行 reason 的消费者
trigger: str # 本次 reason 的触发原因
started_at: str # 本次 reason 开始时间
last_heartbeat_at: str # 最近一次 reason 心跳时间

class ProjectMeta(BaseModel):
id: str
title: str
status: Literal["active", "stopped", "completed"]
created_at: str
reason: ProjectReason | None = None # reason 字段

触发reason的条件,explore 完成,添加 fact,收到新 hint,最后一个 intent 完成(因为open intent被清空了)
不把intent数量增加当作reason触发的条件,会导致无限循环,因为新增的intent是上一轮的产物

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
changes: list[str] = []

# 条件 1:Fact 数量增加(发现了新事实)
if len(project.facts) > checkpoint.fact_count:
changes.append(f"facts:{checkpoint.fact_count}->{len(project.facts)}")

# 条件 2:Hint 数量增加(收到新提示)
if len(project.hints) > checkpoint.hint_count:
changes.append(f"hints:{checkpoint.hint_count}->{len(project.hints)}")

# 条件 3:从"有 open intents"变为"无 open intents"(所有任务完成)
if checkpoint.open_intent_count > 0 and open_intent_count == 0:
changes.append(f"open_intents:{checkpoint.open_intent_count}->0")

if not changes:
return None # 没有实质性变化,不触发 reason
return ",".join(changes)

ReasonCheckpoint机制,在models.py定义了

1
2
3
4
5
@dataclass(slots=True)
class ReasonCheckpoint:
fact_count: int
hint_count: int
open_intent_count: int

存储在 DispatcherLoop 实例的内存中,字典的 key 是 project_id,Checkpoint就用来记录上面说的那几个触发reason的条件的变化,检测到值的变化符合条件就触发reason

hint

1
2
3
4
5
class Hint(BaseModel):
id: str # 提示的唯一标识
content: str # 提示内容
creator: str # 创建者
created_at: str # 创建时间

这个hint设置的是在全部状态都能被添加,他不属于图谱中,Hint 不是 Fact,不会产生 Intent,也不连接到图结构中,是可以人为外部添加的只给Reason worker来提供信息,通过下面的graph_yaml 变量传入到 reason prompt 中 ,AI 在分析时会读取 YAML 中的 hints 信息
直接用前端从/projects/{project_id}/hints路由写入hints

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def check_project_hint_writable(conn: sqlite3.Connection, project_id: str) -> sqlite3.Row:
row = get_project_or_404(conn, project_id)
# Hint 在 active、stopped、completed 状态下都允许写入
if row["status"] not in ("active", "stopped", "completed"):
raise HTTPException(403, f"Project is {row['status']}")
return row

# reason 上下文准备prompt = render_prompt(
load_prompt(config.runtime.prompt_group, "reason.md"),
{
"graph_yaml": export_yaml.strip(), # 包含 hints 的完整图数据
"fact_ids": format_fact_ids(allowed_fact_ids), # 可用 Fact ID 列表
"open_intents": format_open_intents(open_intents), # 未完成的 Intents
"max_intents": str(config.tasks.reason.max_intents), # 最多创建几个新 intent
},
)

这个是对模板的一个reason.md进行替换,把整个项目的图状态,比如可使用的fact和没完成的intent全部传过去,像一个快照,注入给reason的那个worker,触发和具体的实现都在Dispatcher层

在project中的任务中的DAG图实际上就是一个yaml文件,直接给 LLM 看 YAML 格式的图结构,它天然能理解这种层次化表示,还能完整记录了 facts -> intents -> facts 的因果关系链
大概是这样,理解成一个序列化快照 ,用于传给 LLM 作为上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
project:
title: "渗透测试项目"
origin: "目标网站 https://example.com"
goal: "获取 flag"

hints:
- content: "尝试 SQL 注入"
creator: "Human"
created_at: "2026-03-21 18:05:00"

facts:
- id: "origin"
description: "目标网站 https://example.com"
- id: "goal"
description: "获取 flag"
- id: "f001"
description: "网站使用 Nginx 1.18.0"

intents:
- from: ["origin"]
to: "f001"
description: "识别 Web 服务器版本"
creator: "agent-A"
worker: "claudecode"
created_at: "2026-03-21 18:05:00"
concluded_at: "2026-03-21 18:06:00"

- from: ["f001"]
to: null
description: "尝试利用目录遍历"
creator: "agent-A"
worker: null
created_at: "2026-03-21 18:09:00"
concluded_at: null

中途添加hint,分两种情况,第一种是创建项目的时候写的,直接就被替换进prompt传送进去
已经在跑的那一次任务,不会自动热更新 prompt,就是只有bootstrap的时候是这样接收的
reason / explore阶段就只能通过上面那个graph_yaml状态的改动来读取,dispatcher是会不断轮询的,dispatcher 下一次再 export_project(project_id) 时,就会触发reason

Dispatcher

上面的server是服务端,这个Dispatcher就是客户端执行器
prompts放的是对应任务不同的生命周期的提示词,比如说bootstrap.md是引导任务提示词
给每个Agent(worker)派发任务都在这一层,Dispatcher 是唯一的协议写入者和控制面;Agent 不 claim、不 heartbeat
在Dispatcher有三种任务类型,Bootstrap,explore和Reason

Bootstrap Intent 的定义

\cairn\src\cairn\dispatcher\scheduler\loop.py
Bootstrap Intent 其实任务流程和生命周期都和普通的intent一样,只是语义解释不同,这个定义的意思是第一次探索,一个reason的初次任务:只有 origin 和 goal 两个事实

1
2
3
4
5
6
7
8
def _is_initial_project(self, project: ProjectDetail) -> bool:
# 只有 origin 和 goal 两个事实,且所有 intent 都是 bootstrap 类型
fact_ids = {fact.id for fact in project.facts}
if fact_ids != {"origin", "goal"} or len(project.facts) != 2:
return False
if not project.intents:
return True
return all(self._is_bootstrap_intent(intent) for intent in project.intents)

加载提示词–worker流程

prompting.py, 加载提示词模板,然后用下面的claudecode.py构建好这个命令数组,作为cli参数发过去
三种任务类型都用各自设计的提示词模板,替换数据传入

1
2
3
4
5
6
7
8
9
10
11
12
13
def build_execute(self, worker: WorkerConfig, prompt: str, session: str | None) -> DriverResult:
assert session is not None
return DriverResult(
argv=[
"claude", # Claude Code CLI
"--session-id", session, # 会话 ID,用于保持上下文
"--dangerously-skip-permissions", # 跳过权限确认
"-p", # 表示后面是 prompt
"--", # 参数分隔符
prompt, # ← 这里就是渲染后的完整提示词
],
session=session,
)

在 Docker 容器中exec 注入执行,worker的执行层有execute阶段和conclude(收尾阶段)
只在第一阶段出现异常(超时或输出解析失败)时才会进入。目的是利用同一个 session 让 Agent 总结当前已经完成的工作,而不是继续探索,这就是适配框架的鲁棒性设计
\dispatcher\runtime中的containers.py容器管理,以一个project为单位创建容器,调度循环loop.py中处理容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# containers.py#L181-L201

def build_exec_process(
self,
container_name: str,
env: dict[str, str], # 环境变量 (API Key 等)
command: list[str], # 如 ["claude", "-p", "--", "<prompt>"]
timeout_seconds: int | None = None,
kill_after_seconds: int = 5,
) -> ManagedProcess:
container = self._require_container(container_name)

argv: list[str] = []
if timeout_seconds is not None:
# 添加 Linux timeout 命令包装
# 300秒超时,超时后等5秒发 SIGKILL
argv.extend([
"timeout",
"-k", f"{kill_after_seconds}s", # -k: 超时后强制kill
f"{timeout_seconds}s", # 超时时间
])

argv.extend(command)
# 最终: ["timeout", "-k", "5s", "300s", "claude", "-p", "--", "..."]

return ManagedProcess(container, argv, env)

对容器的进程有两层保护的机制,liunx命令启动的时候加了一个超时参数,第二个是Python 的thread.join(timeout),用docker sdk把进程给kill掉
Docker Exec可直接利用PID把worker运行的那个进程杀掉,不影响其他的

1
2
3
4
5
if timeout_seconds is not None:
argv.extend([
"timeout", # Linux 的 timeout 命令
"-k", f"{kill_after_seconds}s", # 强制杀死的等待时间
f"{timeout_seconds}s", # 超时时间

两个dockerfile对应两个容器,Cairn的server和Dispatcher,第二个是worker工作环境的容器,里面放了kali和一些安全工具,知识库和 PoC。codex和claude,每一个容器都直接用这个镜像
dockerfile设定了cc和codex的工作目录,mcp和skill,还有基本配置在容器启动时载入,工作目录就是解题空间,poc和知识库都在这个目录。

1
2
3
4
5
6
7
# Dockerfile
COPY ./.agents /home/kali/workspace/.agents
COPY ./.agents /home/kali/workspace/.claude
COPY ./AGENTS.md /home/kali/workspace/AGENTS.md
COPY ./AGENTS.md /home/kali/workspace/CLAUDE.md

WORKDIR /home/kali/workspace ← 容器启动时的当前目录

session维护机制

针对不同的worker,比如codex和claudecode,要设定对应的session管理方式
\dispatcher\workers的base.py,正常使用时,Claude Code 会自动创建和管理 session,但是直接用自定义的UUID 来 控制 session ID在这个情况更方便,因为 Cairn 需要在不同的时间点精确地恢复 同一个会话
codex会在 stderr 输出 session ID,直接用一个函数提取出来就行了
Driver类声明了自己的type_name,worker在config的时候要写WorkerType,执行任务的时候就直接获取Driver。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def run_bootstrap_task(..., intent, ...):
创建 session(内存中的 UUID)
session = driver.prepare_session()
# execute 阶段使用 session
execute = driver.build_execute(worker, prompt, session)
first = run_worker_process(..., execute.argv, ...)
# 提取 session
session = driver.extract_session(session, first.stderr)
# 如果 execute 失败,使用同一个 session 恢复对话
# fallback 到 conclude
return _try_conclude_fallback(
...,
intent,
session, # ← 传递同一个 session
...
)
# 如果 execute 成功,直接返回结果
# session 被丢弃,不再需要
return "success"

session 是以 intent 为单位进行管理的,同一个 Intent 任务内 ,execute 和 conclude 两个阶段之间 传递对话上下文的媒介

健康检查机制,Dispatcher启动时检查全部的worker,每个 Intent 任务执行前只检查 当前任务要使用的 worker,具体的检查内容是Base URL ,API Key 和指定的模型可用性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def run_startup_healthchecks(config, container_manager, *, show_commands=False):
# 创建临时容器
container_name = container_manager.create_startup_container()
# 并行检查所有 worker
with ThreadPoolExecutor(max_workers=parallelism) as executor:
future_map = {
executor.submit(
_run_worker_healthcheck,
container_manager, container_name, worker,
config.runtime.healthcheck_timeout,
): worker.name
for worker in workers
}
# 最后清理临时容器
container_manager.remove_container(container_name, force=True)

Worker 执行结果输出与记录

worker的执行是在容器中,上面的加载提示词–worker流程输入
输出读取的原理,使用一个流程处理类,利用好Docker Python SDK 的原生 API,这几个Agent CLI 工具返回的是,Docker SDK 只负责传输原始字节流
CLI工具限制llm返回的格式只有通过bootstrap.md等几个prompt限制,明确要求返回json
各个Driver在执行前构建 execute 命令和初始化 session ID并且注入环境变量,在任务执行后的输出阶段,也要提取session ID,判断是否支持 conclude fallback

然后就到ManagedProcess 缓冲并拼接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ManagedProcess:  # ← 自定义类
def __init__(self, container: Container, command: list[str], env: dict[str, str]):
self.command = command # 自定义
self.env = env # 自定义
self._container = container # Docker SDK 原生对象
self._api = container.client.api # Docker SDK 原生 API 客户端

self._exec_id: str | None = None # Docker 返回的 exec ID
self._reader: threading.Thread | None = None # 自定义线程
self._stdout: list[str] = [] # 自定义缓冲
self._stderr: list[str] = [] # 自定义缓冲
self._returncode: int | None = None # Docker 返回的退出码
self._timed_out = False # 自定义标志
self._cancel_reason: str | None = None # 自定义
self._read_error: str | None = None # 自定义
self._done = threading.Event() # 自定义同步原语

输出解析层是用JSON提取
容器内的Agent 进程 (claude/codex),LLM 可能不完全遵循 只返回 JSON的指令,用一个提取json的脚本,后面就到了通用包装器解包,contracts.py,在目前这个提取输出信息的方法中,bootstrap.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try:
payload = parse_json_output(first.stdout)
kind, data = validate_bootstrap_execute_payload(payload)
except Exception as exc:
# 解析失败时
LOG.warning(
"bootstrap parse failed project=%s intent=%s worker=%s error=%s ... stdout_preview=%s stderr_preview=%s",
project.project.id,
intent.id,
worker.name,
exc,
preview(first.stdout), # 只记录日志(截取 1200 字符)
preview(first.stderr),
)
# 触发 conclude fallback,尝试让 Agent 重新输出
return _try_conclude_fallback(...) # 不保留非 JSON 信息

如果JSON 提取失败时,触发conclude,在explore和bootstrap任务中都有分别的实现
Timeout超时触发和JSON 解析或验证失败触发,在这个阶段,就是恢复execute阶段执行失败的session,在conclude阶段注入的是bootstrap_conclude.md,大概意思就是强制 LLM 立即停止工作,只输出已确认的事实摘要,写入结果
虽然非 JSON 内容没有被系统提取,但 Agent 通过 session 机制保存着上下文,还可以用session恢复。

Mock Driver

mockdriver 用于本地观察 dispatcher 的成功、失败和超时路径,这个driver的Prompt 是结构化 JSON,不调用真正的 AI API,可以直接用Mock Driver来测试模拟实际的调用情况
模拟的就是 stdout/stderr ,返回的 JSON 是否符合协议,真正被替换掉的,只有 worker 这一层

1
server -> dispatcher -> executor(local/container) -> healthcheck->timeout-> 协议写回 

这些都还是真跑的,mock启动一个子进程,把mock 行为配置 和 当前任务的 prompt JSON传进去

1
2
3
4
5
6
{
"phase": "reason",
"fact_ids": {fact_ids},
"open_intents": {open_intents},
"max_intents": {max_intents}
}

设置dispatch_mock.yaml,在worker的env配置Mock 行为,比如

1
2
3
4
5
MOCK_HEALTHCHECK: '{"delay":[0,2],"outcomes":{"ok":0.9,"fail":0.1}}'
MOCK_BOOTSTRAP: '{"delay":[5,10],"outcomes":{"complete":0.0,"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}'
MOCK_REASON: '{"delay":[2,6],"outcomes":{"complete":0.1,"intent":0.4,"noop":0.1,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}'
MOCK_EXPLORE_EXECUTE: '{"delay":[5,10],"outcomes":{"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}'
MOCK_EXPLORE_CONCLUDE: '{"delay":[2,6],"outcomes":{"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}'

主调度循环的非阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# loop.py 第 71-100 行
def run(self, once: bool = False) -> None:
try:
self.run_startup_healthchecks()
while True: # 主调度循环
try:
if not self._settings_checked:
self._validate_server_settings()
self._settings_checked = True

self._reap_futures() # 1. 收集已完成的任务
self._reap_cleanup_futures() # 2. 收集已完成的清理任务
summaries = self.client.list_projects()
self._initialize_reason_checkpoints(summaries)
self._refresh_runtime_projects(summaries)
self._cancel_inactive_tasks(summaries)
self._queue_container_cleanups(summaries) # 3. 队列化容器清理
self._dispatch_available(summaries) # 4. 调度新任务
except requests.RequestException as exc:
if once:
raise
LOG.warning("dispatcher server request failed ...")
time.sleep(self.config.runtime.interval)
continue

if once:
break
time.sleep(self.config.runtime.interval) # 每隔 interval 秒循环一次
finally:
...

主循环每 interval 秒执行一次,必须快速完成,不能被任何操作阻塞
分层主任务线程(执行 bootstrap、explore、reason 任务)和清理线程池(专门处理容器清理,独立于主任务)

1
2
3
4
5
6
7
8
# 主循环中的清理操作
def _queue_container_cleanups(self, summaries: list[ProjectSummary]) -> None:
# 只是提交任务到线程池,立即返回
future = self.cleanup_executor.submit(
self.container_manager.cleanup_completed,
summary.id
)
# 不等待 future 完成,继续执行后续代码

Dispatcher通过dispatch.yaml来进行主要功能的配置,在运行时通过config加载
max_running 是全局并发限制,限制跨项目这个worker能接收几个任务

1
2
3
4
5
6
7
runtime:
interval: 3 # 主循环间隔 + heartbeat 周期(秒)
max_workers: 8 # 全局最大并发任务数
max_running_projects: 3 # 最大同时运行的项目数
max_project_workers: 4 # 单个项目内最大并发任务数
healthcheck_timeout: 20 # 健康检查超时(秒)
prompt_group: "default" # 使用的 prompt 模板组

注册的时候通过 type 字段识别协议,然后合并环境变量到容器里面的worker
假如要注册新的worker,先写一个新的driver /dispatcher/workers/adapters/newdriver.py
init.py 中导出新驱动,在注册表中注册驱动
统一接口至少应覆盖这些能力:

  • build_healthcheck(worker):构造健康检查命令
  • prepare_session():需要时预先生成 session id
  • build_execute(worker, prompt, session):构造第一阶段执行命令
  • extract_session(session, stderr):需要时从 stderr 提取 session id,或继续使用预生成 session
  • build_conclude(worker, prompt, session):在双阶段 explore 中恢复同一 session 做收尾
  • supports_conclude():声明该 driver 是否支持双阶段 explore

项目的本意是在无人工介入情况下进行渗透和CTF的解题,但是这是一个完备的系统,可以拆解出可复用的板块,claudecode和codex以容器的形式运行,skill和payload的文档直接注入容器中,没有配任何的外部工具比如rag。不是传统的多agent架构,但是以一个共享的DAG图环境来实现了新的协同方式

重构

重构一个执行模式,可调整为容器执行和本地worker执行,大概的方法是, 配置层决定走哪种模式
调度器、任务层、健康检查层都不再直接依赖 Docker,而是只依赖 ExecutorProtocol
切换模式时,真正变的是配置和工厂返回的执行器实现,上层流程不用改.
把执行后端加上俩种模式的判断

1
2
3
4
5
def create_executor(config: DispatchConfig) -> ExecutorProtocol:
if config.execution_mode == "container":
return ContainerManager(config.container)
if config.execution_mode == "local":
return LocalExecutor(config.workspace_root)

准备两种模式yaml的配置
先启动server

1
2
3
4
5
6
7
8
9
10
11
12
uv run cairn serve --host 127.0.0.1 --port 8080

先跑一次 dispatcher startup healthcheck

uv run cairn dispatch --config dispatch.yaml --startup-healthcheck-only
uv run cairn dispatch --config dispatch-local.yaml --startup-healthcheck-only

启动dispatcher,然后就可以创建 project,这两个顺序随便
uv run cairn dispatch --config dispatch.yaml
uv run cairn dispatch --config dispatch-local.yaml

sudo chown -R $USER:$USER .

对于容器模式的dispatch-local.yaml ,改一下execution_mode: “container”

1
2
3
4
5
6
cap_add: []是Docker 容器的Linux capability追加权限,
如果 worker 里面要做一些更底层的系统/网络操作,就额外开权限

设置轮询周期
runtime:
interval: 3

服务端的UI可以设置timeout。
安装worker

1
2
3
npm config set registry https://registry.npmmirror.com
npm install -g @openai/codex
npm install -g @anthropic-ai/claude-code

当前实现按“单 Dispatcher 实例”设计和测试;不支持多个 Dispatcher 同时连接同一服务端共同调度

1
关键调度状态都在 Dispatcher 进程内存里:runtime_project_ids、reason_checkpoints、worker_unhealthy_until、worker_rejected_until、project_cursor、_cleanup_pending

这些状态不共享,所以两个 Dispatcher 会各自基于自己的旧快照做决策,容易重复选同一个 project、重复触发 reason/explore、重复做 cleanup
最明显的风险点是 bootstrap:两个 Dispatcher 可能同时看到“initial project”,然后都去 create_intent();没有项目级全局锁

日志更新

保留现有 Timeline(原来的log,改成这个合理点),添加一个execution record
复用dispatcher已经有的执行结果ProcessResult,HealthcheckRun,可以看到完整的stdout和stderr


访客数 0 访问量 0 运行 0