ELNOR REPO READER TEXT MIRROR Original path: Current Specs/DOC12/DOC12_IMPLEMENTATION_APPENDIX_R1.md Source repo: /Users/OpenClaw1/Elnor/Elnor Specs Git branch: main Git commit: dbaa25962edc11ab30e8d4ca1715f9ae5bf77331 Generated: 2026-06-09T01:23:58.539Z --- # DOC12 Implementation Appendix (code-facing wiring details) Version: R1 Status: implementation guidance (non-owner, non-storage; do not override owner docs) ## Purpose This appendix exists to prevent “spec drift” during implementation. - DOC12 (main) is the normative architecture and contract surface. - This appendix is **code-facing**: it describes how to wire Q → EC → Gateway for rooms without inventing APIs, corrupting state, or creating fake multi-agent theater. If anything in this appendix conflicts with an owner doc’s storage/behavior ownership (DOC1/DOC4/DOC6/DOC7/DOC8/DOC9/DOC10/DOC11), the owner doc wins and this appendix must be adjusted. ## Strict build order (do not reorder) 1. **DOC4**: implement a visible configured agent registry (room/panel/forum eligibility + metadata). Q “Add Agent” picker must be driven by this. 2. **DOC11**: add room-aware dispatch (`type: room_turn_dispatch`) + session status read + session abort, with correlated reverse telemetry. 3. **DOC10**: add room operations to the Mode x Operation Authority Matrix, and add room correlation fields to route traces. 4. **DOC12** (rooms): implement EC room coordinator + Q room UI/proxies + SSE event stream. 5. Only after the above: ACP room participation and external projection hardening. ## Naming and packages (suggested) - `packages/contracts`: all Zod schemas from DOC12 must live here and be re-exported. - `apps/ec-service`: room coordinator, durable writers, dispatch scheduler. - `apps/q-backend`: proxy layer; no durable state writes. - `apps/q-frontend`: UI only; reads from proxies + SSE. ## Durable storage and concurrency (EC) ### Room durable files (must exist) Per DOC12 §3.1. ### Snapshot versioning All snapshot JSON files must include `_version` and use optimistic concurrency. - When EC applies a mutation, it must: 1) acquire per-room mutex, 2) read current snapshot(s), 3) validate `expected_room_version` when present, 4) append command record to `commands.jsonl`, 5) update derived snapshots (`room_state.json`, `participants_current.json`, `policy_current.json`, etc.), bumping `_version`. ### Atomic write pattern (required) Implement snapshot writes as: 1. write temp file `*.tmp` 2. `fsync` 3. atomic rename to final path ### Per-room mutex Use one of: - in-process `AsyncLock` keyed by `room_id`, plus - an optional lock file (only for multi-process deployments). The lock file is not durable state. It only prevents corruption. ### Command log + idempotency Every mutating call must arrive at EC as a `RoomCommandEnvelopeSchema`. EC must implement: - `idempotency_index.json`: mapping `idempotency_key -> RoomCommandResultSchema` (bounded via compaction), and - `commands.jsonl`: append-only history for audit/debug. If idempotency index grows too large, implement compaction: - keep the most recent N keys, - roll older keys into a compacted JSONL index, - emit `room.command.idempotency_replay` for replays. ## EC module boundaries (suggested) These are boundaries, not sacred paths. - `rooms/room-coordinator.ts` - validates envelopes - owns all room durable mutations - emits telemetry events - `rooms/room-state.ts` - read helpers for snapshots - atomic write helpers - lock helper `withRoomLock(room_id, fn)` - `rooms/room-dispatch.ts` - turns queue/plan scheduling - binds sessions (via DOC11) - calls `room_turn_dispatch` - handles abort/cancel - `rooms/room-telemetry.ts` - emits room telemetry - emits SSE stream events - `rooms/room-cost.ts` - appends `LlmUsageSample` records (or ingests them) - recomputes `cost_aggregate_current.json` - `rooms/room-summary.ts` - bounded summary updates - bridge actions: post summary to forum/panel (calls DOC6-owned writers) ## Q backend proxy routes (REST → EC envelopes) Q backend must expose simple routes for the frontend and translate each into a `RoomCommandEnvelopeSchema` for EC. ### Canonical route set (minimum) - `POST /api/rooms` → `room_create` - `GET /api/rooms/:roomId` → read-only - `GET /api/rooms/:roomId/messages` → read-only - `GET /api/rooms/:roomId/events/stream` → SSE - `POST /api/rooms/:roomId/messages` → `room_post_message` - `POST /api/rooms/:roomId/participants` → `room_add_participant` - `POST /api/rooms/:roomId/participants/:participantId/mute` → `room_mute_participant` - `POST /api/rooms/:roomId/participants/:participantId/unmute` → `room_unmute_participant` - `DELETE /api/rooms/:roomId/participants/:participantId` → `room_remove_participant` - `POST /api/rooms/:roomId/stop` → `room_stop_all` - `POST /api/rooms/:roomId/participants/:participantId/stop` → `room_stop_participant` - `POST /api/rooms/:roomId/context` → `room_attach_context` - `DELETE /api/rooms/:roomId/context/:attachmentId` → `room_detach_context` - `PATCH /api/rooms/:roomId/config` → `room_update_config` - `POST /api/rooms/:roomId/summary/export` → `room_export_summary` - `POST /api/rooms/:roomId/summary/post-to-forum` → `room_post_summary_to_forum` - `POST /api/rooms/:roomId/summary/post-to-panel` → `room_post_summary_to_panel` ### Proxy rule (non-negotiable) No frontend action is allowed to mutate room truth locally. Every mutating action must: Q frontend → Q backend proxy → EC command → durable write → telemetry + SSE → UI refresh. ### Example: proxy handler (TypeScript sketch) ```ts import { z } from "zod"; import { RoomCommandEnvelopeSchema, RoomPostMessageRequestSchema } from "@elnor/contracts"; const PostRoomMessageHttpSchema = z.object({ text: z.string().max(50000), mentioned_participant_ids: z.array(z.string().max(160)).max(12).default([]), attachment_ids: z.array(z.string().max(160)).max(32).default([]), request_rounds: z.number().int().min(0).max(6).default(0), request_summary_at_end: z.boolean().default(false), }); app.post("/api/rooms/:roomId/messages", async (req, res) => { const body = PostRoomMessageHttpSchema.parse(req.body); const payload = RoomPostMessageRequestSchema.parse({ room_id: req.params.roomId, ...body }); const envelope = RoomCommandEnvelopeSchema.parse({ schema_version: 1, command_type: "room_post_message", idempotency_key: req.header("Idempotency-Key") ?? crypto.randomUUID(), operation_id: crypto.randomUUID(), issued_at: new Date().toISOString(), expected_room_version: req.header("If-Match-Version") ? Number(req.header("If-Match-Version")) : undefined, actor: { actor_type: "user", user_id: req.user?.id }, payload, }); const result = await ecClient.sendRoomCommand(envelope); res.status(result.status === "ok" ? 200 : 400).json(result); }); ``` ## SSE room event stream (Q backend) Rooms need push updates to avoid drift and to show streaming turn state. ### Endpoint shape `GET /api/rooms/:roomId/events/stream` ### Event format - `event: room.event` - `data: { RoomEventStreamEventSchema }` ### Recommended semantics - Send a heartbeat event every 15–30s to keep connections alive. - Support `Last-Event-ID` for resuming. - If resume is not possible, require Q frontend to refetch room state and messages. ### SSE sketch ```ts app.get("/api/rooms/:roomId/events/stream", async (req, res) => { res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); const roomId = req.params.roomId; const unsubscribe = roomEventBus.subscribe(roomId, (evt) => { res.write(`event: room.event\n`); res.write(`id: ${evt.event_id}\n`); res.write(`data: ${JSON.stringify(evt)}\n\n`); }); const heartbeat = setInterval(() => { res.write(`event: heartbeat\n`); res.write(`data: {"t":"${new Date().toISOString()}"}\n\n`); }, 20000); req.on("close", () => { clearInterval(heartbeat); unsubscribe(); }); }); ``` ## EC → Gateway room dispatch (DOC11 seam) ### Required request EC must send `GatewayRoomTurnDispatchRequestSchema` with `type: "room_turn_dispatch"`. ### Required reverse event correlation Every Gateway reverse event attributable to a room turn must include: - `room_id`, `room_trace_id` (if present), `room_turn_id`, `participant_id` - `route_trace_id` - `gateway_session_key` ### Abort propagation Room stop actions must reach real execution. DOC11 must expose: - session abort command/endpoint accepting `{gateway_session_key, reason, correlation_fields}` - reverse event indicating abort accepted + final terminal state ## Room turn scheduling: do not block HTTP `room_post_message` must return quickly. Recommended pattern: - EC persists message + queued turns inside a lock. - EC schedules dispatch on an internal queue/worker. - Q observes progress via SSE stream events. ## Workspace/tool collision rule If `RoomWorkspacePolicy.workspace_mode = shared_locked` and a participant is write-capable, enforce a write mutex. Practical minimum: - default sequential dispatch - disallow parallel dispatch unless sandboxed or read-only ## Usage/cost tracking (cross-suite) DOC12 defines `LlmUsageSampleSchema` as the base unit. Implementation guidance: - Gateway should report token counts per response if possible. - EC should emit `LlmUsageSample` records into `room_cost_events.jsonl` (or a general usage bus) and update `cost_aggregate_current.json`. - Q should render: - total tokens (and cost if available) - per-participant tokens - budget-block reasons when present The same schema can later be used for panels/forums/tasks/repair runs. ## Acceptance test checklist (minimum) 1. “Add Agent” picker is driven by DOC4 registry and persists participant roster change durably. 2. Each visible participant binds to a distinct room session (no main-session fallback). 3. Mention-only mode activates only mentioned participants. 4. Mute/unmute/remove affect scheduling, keep history, and affect in-flight turns via abort. 5. Stop all propagates abort to Gateway, and queued turns are cancelled. 6. Global STOP blocks new dispatch (`GLOBAL_STOP_ACTIVE`). 7. SSE stream delivers roster/message/turn updates; UI stays consistent without polling. 8. Retention policy blocks memory promotion at DOC1 candidate creation; block is auditable. 9. Usage/cost samples correlate to room and appear in `cost_aggregate_current.json`.