AoE2 · LLM Arena

Event Broker Architecture — Log-First Live Streaming

Status: SHIPPED — Phases 0 through C complete (2026-05). Frozen historical design spec; for current state see Chapter 15 — Event Broker. Decisions extracted as ADR 0001 and ADR 0002. Author: Marton (planning session 2026-05-21) Supersedes: ad-hoc LiveRunRegistry + BroadcastingSink (Phase 9 design)


1. Context

1.1 What prompted this

Phase 9 introduced operator-driven forks: POST /forks snapshots a parent run, optionally mutates the world, then spawns an async N-turn replay. The replay holds a read-write DuckDB connection to the child run’s .duckdb file for the entire game loop. Meanwhile, the frontend immediately opens GET /events?run_id=<child> to live-tail the replay.

The first SSE call crashes:

_duckdb.ConnectionException: Connection Error: Can't open a connection to same
database file with a different configuration than existing connections

DuckDB’s in-process invariant: every connection to a file in one process must share access mode. The replay holds it read-write; the SSE handler’s _resolve_run_optional scans logs/arena/*/*.duckdb opening each read_only=True to find the run; collision on the active child file.

1.2 Why a small patch is the wrong fix

Two tactical options were considered (drop read_only=True, or route the live path past the scan by tracking db_path in the registry). Both work, both ship in half a day, both encode the same load-bearing assumption: “readers open the writer’s DuckDB file.” That assumption is fine in a single-process FastAPI app and instantly false the day any of these happen:

Each of those is a forced rewrite of the SSE layer under deadline pressure if we bake in the file-coupling assumption now.

1.3 Intended outcome

A log-first architecture where the event log is the single source of truth for live consumers and DuckDB is one of N materializations. The bug disappears as a side effect; the race-window dedupe code disappears as a side effect; cold reads stay untouched; and the day we go distributed, the change is swapping one broker implementation for another, not rewriting the SSE handler.


2. North Star

The event log is the source of truth. DuckDB and SSE clients are two materializations that consume it. Neither knows about the other.

Every design question below resolves against that sentence: does the answer keep producers and consumers coupled only through the log?


3. Conceptual Architecture

                            ┌──────────────────────┐
                            │       Producer       │
                            │ (synth_game_loop,    │
                            │  fork snapshot,      │
                            │  mutation events)    │
                            └──────────┬───────────┘
                                       │ publish(event)

                            ┌──────────────────────┐
                            │     EventBroker      │
                            │  (Protocol; in-proc  │
                            │   impl today; Redis/ │
                            │   NATS swappable)    │
                            └─┬─────────────────┬──┘
                              │                 │
                              │ stream(         │ stream(
                              │  run_id,        │  run_id,
                              │  from_seq=0)    │  from_seq=0)
                              ▼                 ▼
                  ┌──────────────────┐  ┌──────────────────┐
                  │ DuckDBPersister  │  │  SSE Handler     │
                  │ (consumer)       │  │  (consumer)      │
                  └──────────────────┘  └──────────────────┘

Two consumers, symmetric. Neither opens the other’s files. The producer never knows who is listening or whether persistence has caught up.


4. Core Abstractions

All new code lives in evaluation/event_broker.py (broker is a domain concern, not a web concern — the web layer depends on evaluation, never the reverse).

4.1 EventEnvelope — the unit of streaming

# evaluation/event_broker.py
from dataclasses import dataclass
from typing import NewType
from evaluation.event_log import Event   # frozen dataclass; do not modify

RunId = NewType("RunId", str)
Seq   = NewType("Seq", int)   # monotonic & UNIQUE per run, assigned by broker

@dataclass(frozen=True, slots=True)
class EventEnvelope:
    run_id: RunId
    seq:    Seq
    event:  Event

Why a wrapper, not a field on Event: Event is a frozen domain dataclass owned by evaluation/event_log.py and used by all producers (race, ranking, synth_game_loop, fork). Wrapping isolates the broker’s “I assigned this sequence number” concern from the domain. It also means no DuckDB schema change — seq is a runtime concept that lives only in the broker and the SSE wire format if needed.

Why a separate Seq distinct from t: confirmed by exploration — multiple events share the same t within a turn (~8 events per turn: turn_start, observation, llm_prompt, llm_response, action, action_result, world_mutation, metric). Today’s _live_event_stream filters live-tailed events with event.t <= max_t_seen, which silently drops legitimate same-turn events. Seq makes ordering total and dedupe unambiguous.

4.2 EventBroker — the interface

from typing import Protocol, AsyncIterator

class EventBroker(Protocol):
    """Pub/sub with replay-from-offset semantics.

    Producers call open_run → publish*N → close_run.
    Consumers call stream(run_id, from_seq) — one call, replay+tail.
    """

    def open_run(self,  run_id: RunId) -> None: ...
    def close_run(self, run_id: RunId) -> None: ...
    def is_open(self,   run_id: RunId) -> bool: ...

    async def publish(self, run_id: RunId, event: Event) -> Seq: ...

    def stream(
        self,
        run_id: RunId,
        from_seq: Seq = Seq(0),
    ) -> AsyncIterator[EventEnvelope]: ...

Design choices and the principles behind them:

ChoicePrinciple
Protocol, not ABCStructural typing; test doubles are plain classes — no inheritance, no super(), no @abstractmethod boilerplate
Explicit open_run/close_runLifecycle is contract, not implicit-on-first-publish; ownership testable
stream does replay + tail in one callKills the subscribe-then-read-then-dedupe-by-t dance; one concept not three
publish returns SeqProducers get a receipt — useful for at-least-once durability later, immediately useful for tests
RunId/Seq NewTypesWrong-type-as-arg becomes a mypy error, not a runtime bug
No subscribe/unsubscribe APIAsync generator’s finally is the natural unregister; one concept not two

4.3 InProcessEventBroker — Phase 1 implementation

import asyncio
from collections import defaultdict
from itertools import count

class InProcessEventBroker(EventBroker):
    """Single-process, asyncio-native broker.

    Retains every event for the lifetime of the run (open_run → close_run).
    Late subscribers replay from the retained buffer, then tail live.

    Memory: O(events_per_run × concurrent_runs). For arena replays
    (hundreds of events per run), negligible.
    """

    def __init__(self) -> None:
        self._buffers: dict[RunId, list[EventEnvelope]] = {}
        self._seq:     dict[RunId, count]               = {}
        self._open:    set[RunId]                       = set()
        self._waiters: dict[RunId, list[asyncio.Event]] = defaultdict(list)

    def open_run(self, run_id: RunId) -> None:
        if run_id in self._open:
            raise ValueError(f"run {run_id!r} already open")
        self._buffers[run_id] = []
        self._seq[run_id]     = count(1)
        self._open.add(run_id)

    def close_run(self, run_id: RunId) -> None:
        self._open.discard(run_id)
        for w in self._waiters.pop(run_id, ()):
            w.set()
        # Buffer kept; reclaimed via explicit reap() if added later.

    def is_open(self, run_id: RunId) -> bool:
        return run_id in self._open

    async def publish(self, run_id: RunId, event: Event) -> Seq:
        if run_id not in self._open:
            raise RuntimeError(f"run {run_id!r} is not open")
        seq = Seq(next(self._seq[run_id]))
        self._buffers[run_id].append(EventEnvelope(run_id, seq, event))
        for w in self._waiters[run_id]:
            w.set()
        return seq

    async def stream(
        self,
        run_id: RunId,
        from_seq: Seq = Seq(0),
    ) -> AsyncIterator[EventEnvelope]:
        cursor = max(0, int(from_seq) - 1)
        while True:
            buf = self._buffers.get(run_id, [])
            while cursor < len(buf):
                yield buf[cursor]
                cursor += 1
            if not self.is_open(run_id):
                return
            waiter = asyncio.Event()
            self._waiters[run_id].append(waiter)
            try:
                await waiter.wait()
            finally:
                self._waiters[run_id] = [
                    w for w in self._waiters[run_id] if w is not waiter
                ]

Clean-code properties:

4.4 BrokerEventSink — adapter at the producer boundary

@dataclass(frozen=True, slots=True)
class BrokerEventSink:
    """EventSink adapter: sync emit() bridges to async publish().

    The producer (synth_game_loop, fork) uses the existing sync EventSink
    Protocol. This adapter schedules publish onto the broker's loop.
    """
    broker: EventBroker
    run_id: RunId
    loop:   asyncio.AbstractEventLoop

    def emit(self, event: Event) -> None:
        # FIFO per loop preserves order across same-thread emits.
        self.loop.call_soon_threadsafe(
            asyncio.create_task,
            self.broker.publish(self.run_id, event),
        )

4.5 DuckDBPersister — DuckDB becomes a consumer

# evaluation/duckdb_persister.py
async def persist_to_duckdb(
    broker: EventBroker,
    run_id: RunId,
    db_path: Path,
) -> None:
    """Subscribe from seq=0, write every event to DuckDB until close.

    Spawned as a background task alongside the producer. The persister
    is the SOLE owner of the DuckDB file — no other process or task
    opens it.
    """
    with duckdb.connect(str(db_path)) as conn:
        sink = DuckDBEventSink(conn)   # existing class; takes a conn
        async for env in broker.stream(run_id, from_seq=Seq(0)):
            sink.emit(env.event)

This is the structural inversion that fixes everything: nobody but the persister opens the writer’s DuckDB file. The SSE handler doesn’t touch DuckDB for live runs at all. The DuckDB collision bug is gone by construction.

4.6 Cold-run reader (post-finalize)

After close_run, the broker no longer serves the run. Cold-run readers go to DuckDB with read_only=True (safe — no in-process writer remains). To preserve the EventEnvelope contract end to end:

# evaluation/event_log.py (or new evaluation/cold_stream.py)
def stream_cold(db_path: Path, run_id: RunId) -> Iterator[EventEnvelope]:
    """Read finalized events from DuckDB, assigning seq from row order."""
    with duckdb.connect(str(db_path), read_only=True) as conn:
        rows = conn.execute(
            "SELECT t, payload_json FROM events "
            "WHERE run_id=? ORDER BY t, rowid",   # rowid: same-t tiebreak
            [run_id],
        ).fetchall()
    for i, (_t, payload_json) in enumerate(rows, start=1):
        yield EventEnvelope(
            run_id=run_id,
            seq=Seq(i),
            event=_event_from_payload(payload_json),  # see verification §10
        )

ORDER BY t, rowid is doing real work — it fixes the same-turn ordering ambiguity the current code papers over. Cold and live consumers now share an identical contract.


5. The new SSE handler

# arena/web/server.py
@app.get("/events")
async def events(
    run_id: str           = Query(..., min_length=1),
    broker: EventBroker   = Depends(get_broker),
) -> StreamingResponse:
    return StreamingResponse(
        _sse(broker, RunId(run_id)),
        media_type="text/event-stream",
    )

async def _sse(broker: EventBroker, run_id: RunId) -> AsyncIterator[str]:
    if broker.is_open(run_id):
        async for env in broker.stream(run_id, from_seq=Seq(0)):
            yield f"data: {env.event.payload.model_dump_json()}\n\n"
    else:
        db_path = await asyncio.to_thread(_resolve_cold_db, run_id, _logs_root())
        for env in stream_cold(db_path, run_id):
            yield f"data: {env.event.payload.model_dump_json()}\n\n"

Before vs after:

ConcernBeforeAfter
LOC in live-stream handler~302
DuckDB opens per live request1 + N scan opens0
Race-window dedupemanual t <= filter (latently buggy)none — Seq is unique + monotonic
Concept countSubscription + queue + register + subscribe + unsubscribeone: stream
Process-local assumptionhard-coded in handlerisolated to broker impl

6. File / module layout

New files:

Modified files:

Deleted:


7. Reused existing functions / utilities

ExistingWhere it livesHow the plan reuses it
Event dataclassevaluation/event_log.py:194Wrapped, never modified
DuckDBEventSinkevaluation/event_log.py:241Used by the persister; lifecycle (caller-owned connection) preserved
SCHEMA_VERSION, _CREATE_TABLE_SQLevaluation/event_log.pyUntouched — no schema migration
fork() (snapshot logic)evaluation/fork.py:73Unchanged; its emitted Event flows through the broker
synth_game_loop + build_synth_invokearena/invoke.py, etc.Unchanged; the replay’s sink swaps from BroadcastingSink to BrokerEventSink
FastAPI Depends injectionalready used for registry/fork_tasks in server.pySame pattern for get_broker — zero new infra
Pydantic v2 model_dump_json on Payloadevaluation/event_log.pySSE serialization stays identical to today’s live.py:98 shape

8. Migration phases

Each phase ships independently, stays reversible, and leaves the system green.

Phase 0 — Prep (~0.5 day)

Phase 1 — Add the broker behind a feature flag (~2 days)

Phase 2 — Cutover & delete (~1 day)

Phase 2.5 — Roll non-fork producers onto the broker (~1 day, optional)

Today arena/__main__.py, arena/race.py, arena/ranking.py all write directly to DuckDB via DuckDBEventSink. They’re not live-streamed today (no SSE consumer). Rolling them onto the broker:

Defer until someone wants live UI for non-fork runs.

Phase 3 — Hardening (~1 day, opt-in)

Phase C — Distributed swap (shipped, opt-in)

RedisStreamsBroker(EventBroker) implements the full Protocol against a Redis stream per run. Selected at process startup via ARENA_BROKER_BACKEND=redis (default remains inprocess); the only callsite change is evaluation/broker_factory.make_broker() reading the env. Cold-path readers continue using DuckDB; live-path readers transparently get cross-process semantics — a producer in process A and an SSE handler in process B share state through Redis keys.

Key design decisions:

NATS JetStream was the alternative considered and explicitly deferred: Redis was already in docker-compose.yml for Langfuse, so Phase C added zero infra. Backend choice can be revisited as a Phase D follow-up.


9. Testing strategy

Three layers, three failure modes:

LayerProvesDoesn’t prove
Unit (broker)Pub/sub semantics, ordering, lifecycle, replay correctnessWiring
Contract (parametrized)Any EventBroker impl satisfies the same property testsPerformance
Integrationcreate_fork + persister + SSE work together; no DB collisionCross-process behavior

Contract test pattern — the key investment for Phase C swap-ability:

@pytest.mark.parametrize("broker_factory", [
    InProcessEventBroker,
    # RedisStreamsBroker,    # when implemented
])
@pytest.mark.asyncio
async def test_broker_contract(broker_factory):
    broker = broker_factory()
    broker.open_run(RunId("r1"))
    for i in range(5):
        await broker.publish(RunId("r1"), _make_event(t=i))
    broker.close_run(RunId("r1"))

    seqs = [env.seq async for env in broker.stream(RunId("r1"), Seq(0))]
    assert seqs == [Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)]

Property test (hypothesis): given any sequence of publish/stream operations interleaved, every stream consumer must observe events 1..N in order, exactly once, with no gaps. This is the invariant that defines correctness — if it holds, the implementation details below it don’t matter.


10. Verification — how to confirm the plan worked

Local end-to-end smoke (after Phase 2):

  1. just arena-infra-up — start backing services.
  2. Start backend with ANTHROPIC_API_KEY set; start frontend (npm run dev in arena/web/ui).
  3. In the UI: pick a finalized parent run, scrub timeline to a turn boundary, hit fork.
  4. Observe in DevTools Network tab:
    • POST /forks → 200 with child_run_id.
    • GET /events?run_id=<child> → 200 stays open, SSE events arrive within ~1 sec.
  5. Backend logs: no ConnectionException, no stack traces. Persister log line per turn.
  6. After replay completes (~30 sec), open a fresh tab and request /events?run_id=<child> — should fall through to the cold path and replay the full event tape from DuckDB.

Automated:

Distributed verification (Phase C — shipped):


11. Open verification items (resolve at Phase 0 kickoff)

Things confirmed by exploration on 2026-05-21:

Things to verify on first read in Phase 0:

Resolved during Phase C (2026-05-23):


12. Effort estimate

PhaseEffortStatus
0 — Prep & regression test0.5 day✅ shipped
1 — Broker + persister + tests1.5–2 days✅ shipped (d408401)
2 — Cutover & deletions1 day✅ shipped (74eb7c6)
2.5 — Non-fork producers (optional)1 day✅ shipped (60da488)
3 — Hardening (optional)1 day✅ shipped (6b53a0c)
C — Distributed swap (Redis Streams)1 day✅ shipped
Total to fix the bug correctly~5 working days

For reference: the tactical patch (“option B”) was ~0.5 day, at the cost of locking in the file-coupling assumption.


13. Cross-walk: Python & clean-code principles

PrincipleWhere it shows up
Depend on abstractions (DIP)EventBroker Protocol; Depends(get_broker)
Single ResponsibilityBroker, persister, SSE handler each do one thing
Open/ClosedNew broker impls require zero consumer changes
Composition over inheritanceBrokerEventSink has-a broker; doesn’t subclass it
Immutability where possibleEventEnvelope frozen+slotted; events append-only
Explicit lifecycleopen_run / close_run instead of implicit-on-first-publish
One concept, not twostream replaces subscribe + queue + iterate
Make illegal states unrepresentableRunId, Seq NewTypes; broker rejects publish-before-open
Type the boundariesProtocol, NewType, AsyncIterator[EventEnvelope]
Tests are documentationContract test is the broker spec
Delete code, don’t accumulate itPhase 2 step explicitly enumerates deletions