项目架构 oritera/Cairn: A AI general-purpose state-space search engine, validated first on autonomous penetration testing. 总的来看,这个架构是把整个任务的执行过程定了一个起点和终点,看成一个有向无环图 灵感来源于黑板架构,不同的agent的工作都源于这个共享的黑板,它们之间不直接通信,像现在的多智能题的架构的A2A通信,但是可以根据工作中的发现或者进展来改变共享环境的内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ┌──────────────────────────────────┐ │ Cairn Server │ │ Facts + Intents + Hints │ └─────────────────┬────────────────┘ │ Read / Write API │ ┌─────────────────┴────────────────┐ │ Dispatcher │ │ 调度任务 / 管理容器 / 写协议 │ └──────────┬───────────────┬───────┘ │ │ ┌───────┴────┐ ┌──────┴────────┐ │ Container │ │ Container │ │ (Project A)│ │ (Project B) │ │ Workers │ │ Workers │ └────────────┘ └───────────────┘
claudecode的多智能体架构 第一类是subagent,一个你派去执行专门任务的子agent。 官方文档写的是当一个辅助任务会用搜索结果、日志或文件内容充斥您的主对话,而您不会再次引用这些内容时,请使用一个 subagent,用superpower写代码的时候也会遇见让你选择subagent执行或者单session,在这个终端里面依次执行任务。 Subagent的特点是在自己的 context window 中运行,不和主agent共享上下文,就是不和主agent通信,sub执行任务的结果会返回给主agent,直接注入主agent的上下文
第二类是Agent teams,是直接开几个不同的cc实例,独立工作,每个都在自己的 context window 中,并直接相互通信,生成时,队友加载与常规会话相同的项目 context:CLAUDE.md、MCP servers 和 skills。
Cairn Server 1 2 3 4 5 6 7 8 9 10 11 12 cairn/src/cairn/server/ ├── __init__.py ├── app.py # FastAPI 应用入口 ├── db.py # 数据库层 ├── models.py # Pydantic 数据模型 ├── services.py # 主要逻辑 └── routers/ ├── projects.py # 项目相关 API ├── intents.py # Intent 相关 API ├── hints.py # Hint 相关 API ├── export.py # 导出功能 └── settings.py # 全局设置
路径cairn/src/cairn/server/
Fact facts是节点,代表一个agent发现的信息或者解题线索,只有描述文本,没有状态标记。只增不改,永久保留,每个 Project 有两个特殊 Fact,在 Project 创建时写入
intent Intent是边,连接不同的fact,这个边是要被worker认领去执行,执行完成的标志就是产出了结论Fact
1 2 3 4 5 6 7 8 9 10 class Intent (BaseModel ): id : str from_: list [str ] = Field(alias="from" ) to: str | None = None description: str creator: str worker: str | None = None last_heartbeat_at: str | None = None created_at: str concluded_at: str | None = None
intent有一个heartbeat来保证系统的正常运行,一个完整的生命周期是从一个有效的fact开始,创建,定时发送一个心跳,使用一个后台进程,超时未收到心跳就就释放 worker ,但是这个心跳包的发送不是在server层,而是在Dispatcher 实现的。 from 字段是一个列表,支持多个源 Fact,一条边可以同时从多个源节点出发
1 2 3 f002 ──┐ ├──(i003)──→ f006 f004 ──┘
具体的实现还有一些问题,看一下源码确认 三个表,fact和intent就是单存他们各自的元数据 intent_sources 表存储 Intent 和 Fact 的 关联关系 (哪个 Intent 从哪些 Fact 出发)
1 2 3 4 5 6 7 CREATE TABLE IF NOT EXISTS intent_sources ( intent_id TEXT NOT NULL , project_id TEXT NOT NULL , fact_id TEXT NOT NULL , # 一个 Intent 对应多个 fact_id PRIMARY KEY (intent_id, project_id, fact_id), FOREIGN KEY (intent_id, project_id) REFERENCES intents(id, project_id) ON DELETE CASCADE );
一个 Intent 可以在 intent_sources 表中有多条记录,每条记录代表一个源 Fact,通过 intent_id 关联,实现”一个 Intent 从多个 Fact 出发,关于如何处理fact能联合,是Dispatcher层的通过 reason 任务来决定。
app.py 作为FastAPI 应用入口,lifespan 管理应用生命周期,fastapi的路由在app.py注册,具体逻辑写在routers的同名py文件里面 Project是一个有明确起点和终点的问题实例,包含完整的图数据,model.py的具体代码 model.py定义了图的核心元素还有管理类定义
1 2 3 4 5 class ProjectDetail (BaseModel ): project: ProjectMeta facts: list [Fact] intents: list [Intent] hints: list [Hint]
project project有三种状态
1 status: Literal ["active" , "stopped" , "completed" ]
active和completed好理解,stopped是可恢复到active,代码中, stopped 会清空 两类 worker, reason的和intent的,其他的整个图都完整保留,恢复的时候重新按正常流程调度,目的是避免脏状态 。如果不停止时清空,某个 worker 已经不在运行了,但 lease 还锁着它,其他的worker也不能进去
1 2 3 4 5 6 7 8 9 if body.status == "stopped" : conn.execute( "UPDATE intents SET worker = NULL WHERE project_id = ? AND concluded_at IS NULL" , (project_id,), ) clear_project_reason(conn, project_id)
其他全部数据都保留,所有的fact和intents都在,新的 worker 可以 继续 处理这些未完成的 intents 恢复的时候就直接改status
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class UpdateProjectStatusRequest (BaseModel ): status: Literal ["active" , "stopped" ] @router.post("/projects/{project_id}/intents/{intent_id}/heartbeat" ) def heartbeat (project_id: str , intent_id: str , body: HeartbeatRequest ): with get_conn() as conn: check_project_active(conn, project_id) get_claimable_open_intent_or_404(conn, project_id, intent_id, body.worker) now = utcnow() conn.execute( "UPDATE intents SET worker = ?, last_heartbeat_at = ? WHERE id = ? AND project_id = ?" , (body.worker, now, intent_id, project_id), )
Reason 架构中有两个心跳机制,一个是Intent 心跳,用来认领worker的控制,作用在单个探索任务,任务的结果就是输出新的fact,Reason心跳控制的范围是一个大任务的状态,任务的结果是提出新的intent或者是完成goal,理解为规划层。 Project.reason,单个project同一时刻只能有一个 reason,就是一张图,可以多个 Intent 并发执行 intent的explore是独立的任务,reason是决策不能并行
1 2 3 4 current_worker = row["reason_worker" ] if current_worker is not None and current_worker != body.worker: raise HTTPException(409 , f"Project reason is currently claimed by {current_worker} " )
Facts+ Intents+ Hints
1 2 3 4 5 6 7 8 9 10 11 12 class ProjectReason (BaseModel ): worker: str trigger: str started_at: str last_heartbeat_at: str class ProjectMeta (BaseModel ): id : str title: str status: Literal ["active" , "stopped" , "completed" ] created_at: str reason: ProjectReason | None = None
触发reason的条件,explore 完成,添加 fact,收到新 hint,最后一个 intent 完成(因为open intent被清空了) 不把intent数量增加当作reason触发的条件,会导致无限循环,因为新增的intent是上一轮的产物
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 changes: list [str ] = [] if len (project.facts) > checkpoint.fact_count: changes.append(f"facts:{checkpoint.fact_count} ->{len (project.facts)} " ) if len (project.hints) > checkpoint.hint_count: changes.append(f"hints:{checkpoint.hint_count} ->{len (project.hints)} " ) if checkpoint.open_intent_count > 0 and open_intent_count == 0 : changes.append(f"open_intents:{checkpoint.open_intent_count} ->0" ) if not changes: return None return "," .join(changes)
ReasonCheckpoint机制,在models.py定义了
1 2 3 4 5 @dataclass(slots=True ) class ReasonCheckpoint : fact_count: int hint_count: int open_intent_count: int
存储在 DispatcherLoop 实例的内存中,字典的 key 是 project_id,Checkpoint就用来记录上面说的那几个触发reason的条件的变化,检测到值的变化符合条件就触发reason
hint 1 2 3 4 5 class Hint (BaseModel ): id : str content: str creator: str created_at: str
这个hint设置的是在全部状态都能被添加,他不属于图谱中,Hint 不是 Fact,不会产生 Intent,也不连接到图结构中,是可以人为外部添加的只给Reason worker来提供信息,通过下面的graph_yaml 变量传入到 reason prompt 中 ,AI 在分析时会读取 YAML 中的 hints 信息 直接用前端从/projects/{project_id}/hints路由写入hints
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def check_project_hint_writable (conn: sqlite3.Connection, project_id: str ) -> sqlite3.Row: row = get_project_or_404(conn, project_id) if row["status" ] not in ("active" , "stopped" , "completed" ): raise HTTPException(403 , f"Project is {row['status' ]} " ) return row load_prompt(config.runtime.prompt_group, "reason.md" ), { "graph_yaml" : export_yaml.strip(), "fact_ids" : format_fact_ids(allowed_fact_ids), "open_intents" : format_open_intents(open_intents), "max_intents" : str (config.tasks.reason.max_intents), }, )
这个是对模板的一个reason.md进行替换,把整个项目的图状态,比如可使用的fact和没完成的intent全部传过去,像一个快照,注入给reason的那个worker,触发和具体的实现都在Dispatcher层
在project中的任务中的DAG图实际上就是一个yaml文件,直接给 LLM 看 YAML 格式的图结构,它天然能理解这种层次化表示,还能完整记录了 facts -> intents -> facts 的因果关系链 大概是这样,理解成一个序列化快照 ,用于传给 LLM 作为上下文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 project: title: "渗透测试项目" origin: "目标网站 https://example.com" goal: "获取 flag" hints: - content: "尝试 SQL 注入" creator: "Human" created_at: "2026-03-21 18:05:00" facts: - id: "origin" description: "目标网站 https://example.com" - id: "goal" description: "获取 flag" - id: "f001" description: "网站使用 Nginx 1.18.0" intents: - from: ["origin" ] to: "f001" description: "识别 Web 服务器版本" creator: "agent-A" worker: "claudecode" created_at: "2026-03-21 18:05:00" concluded_at: "2026-03-21 18:06:00" - from: ["f001" ] to: null description: "尝试利用目录遍历" creator: "agent-A" worker: null created_at: "2026-03-21 18:09:00" concluded_at: null
中途添加hint,分两种情况,第一种是创建项目的时候写的,直接就被替换进prompt传送进去 已经在跑的那一次任务,不会自动热更新 prompt,就是只有bootstrap的时候是这样接收的 reason / explore阶段就只能通过上面那个graph_yaml状态的改动来读取,dispatcher是会不断轮询的,dispatcher 下一次再 export_project(project_id) 时,就会触发reason
Dispatcher 上面的server是服务端,这个Dispatcher就是客户端执行器 prompts放的是对应任务不同的生命周期的提示词,比如说bootstrap.md是引导任务提示词 给每个Agent(worker)派发任务都在这一层,Dispatcher 是唯一的协议写入者和控制面;Agent 不 claim、不 heartbeat 在Dispatcher有三种任务类型,Bootstrap,explore和Reason
Bootstrap Intent 的定义 \cairn\src\cairn\dispatcher\scheduler\loop.py Bootstrap Intent 其实任务流程和生命周期都和普通的intent一样,只是语义解释不同,这个定义的意思是第一次探索,一个reason的初次任务:只有 origin 和 goal 两个事实
1 2 3 4 5 6 7 8 def _is_initial_project (self, project: ProjectDetail ) -> bool : fact_ids = {fact.id for fact in project.facts} if fact_ids != {"origin" , "goal" } or len (project.facts) != 2 : return False if not project.intents: return True return all (self ._is_bootstrap_intent(intent) for intent in project.intents)
加载提示词–worker流程 prompting.py, 加载提示词模板,然后用下面的claudecode.py构建好这个命令数组,作为cli参数发过去 三种任务类型都用各自设计的提示词模板,替换数据传入
1 2 3 4 5 6 7 8 9 10 11 12 13 def build_execute (self, worker: WorkerConfig, prompt: str , session: str | None ) -> DriverResult: assert session is not None return DriverResult( argv=[ "claude" , "--session-id" , session, "--dangerously-skip-permissions" , "-p" , "--" , prompt, ], session=session, )
在 Docker 容器中exec 注入执行,worker的执行层有execute阶段和conclude(收尾阶段) 只在第一阶段出现异常(超时或输出解析失败)时才会进入。目的是利用同一个 session 让 Agent 总结当前已经完成的工作,而不是继续探索,这就是适配框架的鲁棒性设计 \dispatcher\runtime中的containers.py容器管理,以一个project为单位创建容器,调度循环loop.py中处理容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def build_exec_process ( self, container_name: str , env: dict [str , str ], command: list [str ], timeout_seconds: int | None = None , kill_after_seconds: int = 5 , ) -> ManagedProcess: container = self ._require_container(container_name) argv: list [str ] = [] if timeout_seconds is not None : argv.extend([ "timeout" , "-k" , f"{kill_after_seconds} s" , f"{timeout_seconds} s" , ]) argv.extend(command) return ManagedProcess(container, argv, env)
对容器的进程有两层保护的机制,liunx命令启动的时候加了一个超时参数,第二个是Python 的thread.join(timeout),用docker sdk把进程给kill掉 Docker Exec可直接利用PID把worker运行的那个进程杀掉,不影响其他的
1 2 3 4 5 if timeout_seconds is not None : argv.extend([ "timeout" , "-k" , f"{kill_after_seconds} s" , f"{timeout_seconds} s" ,
两个dockerfile对应两个容器,Cairn的server和Dispatcher,第二个是worker工作环境的容器,里面放了kali和一些安全工具,知识库和 PoC。codex和claude,每一个容器都直接用这个镜像 dockerfile设定了cc和codex的工作目录,mcp和skill,还有基本配置在容器启动时载入,工作目录就是解题空间,poc和知识库都在这个目录。
1 2 3 4 5 6 7 # Dockerfile COPY ./.agents /home/kali/workspace/.agents COPY ./.agents /home/kali/workspace/.claude COPY ./AGENTS.md /home/kali/workspace/AGENTS.md COPY ./AGENTS.md /home/kali/workspace/CLAUDE.md WORKDIR /home/kali/workspace ← 容器启动时的当前目录
session维护机制 针对不同的worker,比如codex和claudecode,要设定对应的session管理方式 \dispatcher\workers的base.py,正常使用时,Claude Code 会自动创建和管理 session,但是直接用自定义的UUID 来 控制 session ID在这个情况更方便,因为 Cairn 需要在不同的时间点精确地恢复 同一个会话 codex会在 stderr 输出 session ID,直接用一个函数提取出来就行了 Driver类声明了自己的type_name,worker在config的时候要写WorkerType,执行任务的时候就直接获取Driver。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def run_bootstrap_task (..., intent, ... ): 创建 session(内存中的 UUID) session = driver.prepare_session() execute = driver.build_execute(worker, prompt, session) first = run_worker_process(..., execute.argv, ...) session = driver.extract_session(session, first.stderr) return _try_conclude_fallback( ..., intent, session, ... ) return "success"
session 是以 intent 为单位进行管理的,同一个 Intent 任务内 ,execute 和 conclude 两个阶段之间 传递对话上下文的媒介
健康检查机制,Dispatcher启动时检查全部的worker,每个 Intent 任务执行前只检查 当前任务要使用的 worker,具体的检查内容是Base URL ,API Key 和指定的模型可用性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def run_startup_healthchecks (config, container_manager, *, show_commands=False ): container_name = container_manager.create_startup_container() with ThreadPoolExecutor(max_workers=parallelism) as executor: future_map = { executor.submit( _run_worker_healthcheck, container_manager, container_name, worker, config.runtime.healthcheck_timeout, ): worker.name for worker in workers } container_manager.remove_container(container_name, force=True )
Worker 执行结果输出与记录 worker的执行是在容器中,上面的加载提示词–worker流程 输入 输出读取的原理,使用一个流程处理类,利用好Docker Python SDK 的原生 API,这几个Agent CLI 工具返回的是,Docker SDK 只负责传输原始字节流 CLI工具限制llm返回的格式只有通过bootstrap.md等几个prompt限制,明确要求返回json 各个Driver在执行前构建 execute 命令和初始化 session ID并且注入环境变量,在任务执行后的输出阶段,也要提取session ID,判断是否支持 conclude fallback
然后就到ManagedProcess 缓冲并拼接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ManagedProcess : def __init__ (self, container: Container, command: list [str ], env: dict [str , str ] ): self .command = command self .env = env self ._container = container self ._api = container.client.api self ._exec_id: str | None = None self ._reader: threading.Thread | None = None self ._stdout: list [str ] = [] self ._stderr: list [str ] = [] self ._returncode: int | None = None self ._timed_out = False self ._cancel_reason: str | None = None self ._read_error: str | None = None self ._done = threading.Event()
输出解析层是用JSON提取 容器内的Agent 进程 (claude/codex),LLM 可能不完全遵循 只返回 JSON的指令,用一个提取json的脚本,后面就到了通用包装器解包,contracts.py,在目前这个提取输出信息的方法中,bootstrap.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 try : payload = parse_json_output(first.stdout) kind, data = validate_bootstrap_execute_payload(payload) except Exception as exc: LOG.warning( "bootstrap parse failed project=%s intent=%s worker=%s error=%s ... stdout_preview=%s stderr_preview=%s" , project.project.id , intent.id , worker.name, exc, preview(first.stdout), preview(first.stderr), ) return _try_conclude_fallback(...)
如果JSON 提取失败时,触发conclude,在explore和bootstrap任务中都有分别的实现 Timeout超时触发和JSON 解析或验证失败触发,在这个阶段,就是恢复execute阶段执行失败的session,在conclude阶段注入的是bootstrap_conclude.md,大概意思就是强制 LLM 立即停止工作,只输出已确认的事实摘要,写入结果 虽然非 JSON 内容没有被系统提取,但 Agent 通过 session 机制保存着上下文,还可以用session恢复。
Mock Driver mockdriver 用于本地观察 dispatcher 的成功、失败和超时路径,这个driver的Prompt 是结构化 JSON,不调用真正的 AI API,可以直接用Mock Driver来测试模拟实际的调用情况 模拟的就是 stdout/stderr ,返回的 JSON 是否符合协议,真正被替换掉的,只有 worker 这一层
1 server -> dispatcher -> executor(local/container) -> healthcheck->timeout-> 协议写回
这些都还是真跑的,mock启动一个子进程,把mock 行为配置 和 当前任务的 prompt JSON传进去
1 2 3 4 5 6 { "phase" : "reason" , "fact_ids" : {fact_ids}, "open_intents" : {open_intents}, "max_intents" : {max_intents} }
设置dispatch_mock.yaml,在worker的env配置Mock 行为,比如
1 2 3 4 5 MOCK_HEALTHCHECK: '{"delay":[0,2],"outcomes":{"ok":0.9,"fail":0.1}}' MOCK_BOOTSTRAP: '{"delay":[5,10],"outcomes":{"complete":0.0,"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}' MOCK_REASON: '{"delay":[2,6],"outcomes":{"complete":0.1,"intent":0.4,"noop":0.1,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}' MOCK_EXPLORE_EXECUTE: '{"delay":[5,10],"outcomes":{"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}' MOCK_EXPLORE_CONCLUDE: '{"delay":[2,6],"outcomes":{"fact":0.6,"rejected":0.1,"invalid_json":0.1,"invalid_payload":0.1,"command_fail":0.1}}'
主调度循环的非阻塞 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def run (self, once: bool = False ) -> None : try : self .run_startup_healthchecks() while True : try : if not self ._settings_checked: self ._validate_server_settings() self ._settings_checked = True self ._reap_futures() self ._reap_cleanup_futures() summaries = self .client.list_projects() self ._initialize_reason_checkpoints(summaries) self ._refresh_runtime_projects(summaries) self ._cancel_inactive_tasks(summaries) self ._queue_container_cleanups(summaries) self ._dispatch_available(summaries) except requests.RequestException as exc: if once: raise LOG.warning("dispatcher server request failed ..." ) time.sleep(self .config.runtime.interval) continue if once: break time.sleep(self .config.runtime.interval) finally : ...
主循环每 interval 秒执行一次,必须快速完成,不能被任何操作阻塞 分层主任务线程(执行 bootstrap、explore、reason 任务)和清理线程池(专门处理容器清理,独立于主任务)
1 2 3 4 5 6 7 8 def _queue_container_cleanups (self, summaries: list [ProjectSummary] ) -> None : future = self .cleanup_executor.submit( self .container_manager.cleanup_completed, summary.id )
Dispatcher通过dispatch.yaml来进行主要功能的配置,在运行时通过config加载 max_running 是全局并发限制,限制跨项目这个worker能接收几个任务
1 2 3 4 5 6 7 runtime: interval: 3 max_workers: 8 max_running_projects: 3 max_project_workers: 4 healthcheck_timeout: 20 prompt_group: "default"
注册的时候通过 type 字段识别协议,然后合并环境变量到容器里面的worker 假如要注册新的worker,先写一个新的driver /dispatcher/workers/adapters/newdriver.py 在 init .py 中导出新驱动,在注册表中注册驱动 统一接口至少应覆盖这些能力:
build_healthcheck(worker):构造健康检查命令
prepare_session():需要时预先生成 session id
build_execute(worker, prompt, session):构造第一阶段执行命令
extract_session(session, stderr):需要时从 stderr 提取 session id,或继续使用预生成 session
build_conclude(worker, prompt, session):在双阶段 explore 中恢复同一 session 做收尾
supports_conclude():声明该 driver 是否支持双阶段 explore
项目的本意是在无人工介入情况下进行渗透和CTF的解题,但是这是一个完备的系统,可以拆解出可复用的板块,claudecode和codex以容器的形式运行,skill和payload的文档直接注入容器中,没有配任何的外部工具比如rag。不是传统的多agent架构,但是以一个共享的DAG图环境来实现了新的协同方式
重构 重构一个执行模式,可调整为容器执行和本地worker执行,大概的方法是, 配置层决定走哪种模式 调度器、任务层、健康检查层都不再直接依赖 Docker,而是只依赖 ExecutorProtocol 切换模式时,真正变的是配置和工厂返回的执行器实现,上层流程不用改. 把执行后端加上俩种模式的判断
1 2 3 4 5 def create_executor (config: DispatchConfig ) -> ExecutorProtocol: if config.execution_mode == "container" : return ContainerManager(config.container) if config.execution_mode == "local" : return LocalExecutor(config.workspace_root)
准备两种模式yaml的配置 先启动server
1 2 3 4 5 6 7 8 9 10 11 12 uv run cairn serve --host 127.0.0.1 --port 8080 先跑一次 dispatcher startup healthcheck uv run cairn dispatch --config dispatch.yaml --startup-healthcheck-only uv run cairn dispatch --config dispatch-local.yaml --startup-healthcheck-only 启动dispatcher,然后就可以创建 project,这两个顺序随便 uv run cairn dispatch --config dispatch.yaml uv run cairn dispatch --config dispatch-local.yaml sudo chown -R $USER:$USER .
对于容器模式的dispatch-local.yaml ,改一下execution_mode: “container”
1 2 3 4 5 6 cap_add: []是Docker 容器的Linux capability追加权限, 如果 worker 里面要做一些更底层的系统/网络操作,就额外开权限 设置轮询周期 runtime: interval: 3
服务端的UI可以设置timeout。 安装worker
1 2 3 npm config set registry https://registry.npmmirror.com npm install -g @openai/codex npm install -g @anthropic-ai/claude-code
当前实现按“单 Dispatcher 实例”设计和测试;不支持多个 Dispatcher 同时连接同一服务端共同调度
1 关键调度状态都在 Dispatcher 进程内存里:runtime_project_ids、reason_checkpoints、worker_unhealthy_until、worker_rejected_until、project_cursor、_cleanup_pending
这些状态不共享,所以两个 Dispatcher 会各自基于自己的旧快照做决策,容易重复选同一个 project、重复触发 reason/explore、重复做 cleanup 最明显的风险点是 bootstrap:两个 Dispatcher 可能同时看到“initial project”,然后都去 create_intent();没有项目级全局锁
日志更新 保留现有 Timeline(原来的log,改成这个合理点),添加一个execution record 复用dispatcher已经有的执行结果ProcessResult,HealthcheckRun,可以看到完整的stdout和stderr