op_log.py
python
| 1 | """Append-only operation log for Muse live collaboration. |
| 2 | |
| 3 | The op log is the bridge between real-time collaborative editing and the |
| 4 | immutable commit DAG. During a live session, operations are appended to |
| 5 | the log as they occur. At commit time the log is collapsed into a |
| 6 | :class:`~muse.domain.StructuredDelta` and stored with the commit record. |
| 7 | |
| 8 | Design principles |
| 9 | ----------------- |
| 10 | - **Append-only** — entries are never modified or deleted; the file grows |
| 11 | monotonically. Compaction happens through checkpoints (see below). |
| 12 | - **Lamport-clocked** — every entry carries a logical Lamport timestamp |
| 13 | that imposes a total order across concurrent actors without wall-clock |
| 14 | coordination. |
| 15 | - **Causally linked** — ``parent_op_ids`` lets any entry declare the ops it |
| 16 | depends on, enabling causal replay and CRDT join operations downstream. |
| 17 | - **Domain-neutral** — the log stores :class:`~muse.domain.DomainOp` values |
| 18 | unchanged; the core engine has no opinion about what those ops mean. |
| 19 | - **Checkpoint / compaction** — when a live session crystallises into a Muse |
| 20 | commit, a checkpoint record is written that marks the current snapshot. |
| 21 | Subsequent reads return only ops that arrived after the checkpoint. |
| 22 | |
| 23 | Layout:: |
| 24 | |
| 25 | .muse/op_log/<session_id>/ |
| 26 | ops.jsonl — one JSON line per OpEntry (append-only) |
| 27 | checkpoint.json — most recent checkpoint (snapshot_id + lamport_ts) |
| 28 | |
| 29 | Relationship to the commit DAG |
| 30 | ------------------------------- |
| 31 | The op log does **not** replace the commit DAG. It is a staging area: |
| 32 | |
| 33 | live edits → OpLog.append() → ops.jsonl |
| 34 | session end → OpLog.checkpoint(snapshot_id) → commit record |
| 35 | commit record → normal Muse commit DAG |
| 36 | |
| 37 | Replaying the log from a checkpoint reproduces the snapshot deterministically, |
| 38 | giving the same guarantee as re-running ``git apply`` from a patch file. |
| 39 | |
| 40 | Usage:: |
| 41 | |
| 42 | from muse.core.op_log import OpLog, make_op_entry |
| 43 | |
| 44 | log = OpLog(repo_root, session_id="session-abc") |
| 45 | entry = make_op_entry( |
| 46 | actor_id="counterpoint-bot", |
| 47 | domain="midi", |
| 48 | domain_op=my_insert_op, |
| 49 | lamport_ts=log.next_lamport_ts(), |
| 50 | ) |
| 51 | log.append(entry) |
| 52 | |
| 53 | delta = log.to_structured_delta("midi") # collapse for commit |
| 54 | ckpt = log.checkpoint(snapshot_id) # crystallise |
| 55 | """ |
| 56 | |
| 57 | import datetime |
| 58 | import json |
| 59 | import logging |
| 60 | import pathlib |
| 61 | import uuid as _uuid_mod |
| 62 | from typing import TypedDict |
| 63 | |
| 64 | from muse.domain import DomainOp, StructuredDelta |
| 65 | |
| 66 | logger = logging.getLogger(__name__) |
| 67 | |
| 68 | _OP_LOG_DIR = ".muse/op_log" |
| 69 | |
| 70 | |
| 71 | # --------------------------------------------------------------------------- |
| 72 | # Wire-format TypedDicts |
| 73 | # --------------------------------------------------------------------------- |
| 74 | |
| 75 | |
| 76 | class OpEntry(TypedDict): |
| 77 | """A single operation in the append-only op log. |
| 78 | |
| 79 | ``op_id`` |
| 80 | Stable UUID4 for this entry — used by consumers to deduplicate |
| 81 | on replay and by CRDT join to establish causal identity. |
| 82 | ``actor_id`` |
| 83 | The agent or human identity that produced this op. |
| 84 | ``lamport_ts`` |
| 85 | Logical Lamport timestamp. Monotonically increasing within a |
| 86 | session; used to establish total ordering when wall-clock times |
| 87 | are unavailable or unreliable. |
| 88 | ``parent_op_ids`` |
| 89 | Causal parents — op IDs that this entry depends on. Empty list |
| 90 | means this entry has no explicit causal dependency (root entry). |
| 91 | Used by CRDT merge and causal replay. |
| 92 | ``domain`` |
| 93 | Domain tag matching the :class:`~muse.domain.MuseDomainPlugin` |
| 94 | that produced this op (e.g. ``"midi"``, ``"code"``). |
| 95 | ``domain_op`` |
| 96 | The actual typed domain operation. Stored verbatim. |
| 97 | ``created_at`` |
| 98 | ISO 8601 UTC wall-clock timestamp when the entry was appended. |
| 99 | Informational only — use ``lamport_ts`` for ordering. |
| 100 | ``intent_id`` |
| 101 | Links this op to a coordination intent (from |
| 102 | :mod:`muse.core.coordination`). Empty string if not applicable. |
| 103 | ``reservation_id`` |
| 104 | Links this op to a coordination reservation. Empty string if not |
| 105 | applicable. |
| 106 | """ |
| 107 | |
| 108 | op_id: str |
| 109 | actor_id: str |
| 110 | lamport_ts: int |
| 111 | parent_op_ids: list[str] |
| 112 | domain: str |
| 113 | domain_op: DomainOp |
| 114 | created_at: str |
| 115 | intent_id: str |
| 116 | reservation_id: str |
| 117 | |
| 118 | |
| 119 | class OpLogCheckpoint(TypedDict): |
| 120 | """A snapshot of the op log state at commit time. |
| 121 | |
| 122 | Written by :meth:`OpLog.checkpoint` when a live session crystallises |
| 123 | into a Muse commit. Subsequent :meth:`OpLog.replay_since_checkpoint` |
| 124 | calls return only ops that arrived after this checkpoint. |
| 125 | |
| 126 | ``session_id`` |
| 127 | The session this checkpoint belongs to. |
| 128 | ``snapshot_id`` |
| 129 | The commit snapshot ID that this checkpoint materialises. All ops |
| 130 | up to and including ``lamport_ts`` are captured by this snapshot. |
| 131 | ``lamport_ts`` |
| 132 | The Lamport timestamp of the last op included in this checkpoint. |
| 133 | ``op_count`` |
| 134 | Number of op entries in the log at checkpoint time. |
| 135 | ``created_at`` |
| 136 | ISO 8601 UTC timestamp. |
| 137 | """ |
| 138 | |
| 139 | session_id: str |
| 140 | snapshot_id: str |
| 141 | lamport_ts: int |
| 142 | op_count: int |
| 143 | created_at: str |
| 144 | |
| 145 | |
| 146 | # --------------------------------------------------------------------------- |
| 147 | # Factory |
| 148 | # --------------------------------------------------------------------------- |
| 149 | |
| 150 | |
| 151 | def make_op_entry( |
| 152 | actor_id: str, |
| 153 | domain: str, |
| 154 | domain_op: DomainOp, |
| 155 | lamport_ts: int, |
| 156 | *, |
| 157 | parent_op_ids: list[str] | None = None, |
| 158 | intent_id: str = "", |
| 159 | reservation_id: str = "", |
| 160 | ) -> OpEntry: |
| 161 | """Create a new :class:`OpEntry` with a fresh UUID op_id. |
| 162 | |
| 163 | Args: |
| 164 | actor_id: Agent or human identity string. |
| 165 | domain: Domain tag (e.g. ``"midi"``). |
| 166 | domain_op: The typed domain operation to log. |
| 167 | lamport_ts: Logical Lamport timestamp for this entry. |
| 168 | parent_op_ids: Causal dependencies. Defaults to empty list. |
| 169 | intent_id: Optional coordination intent linkage. |
| 170 | reservation_id: Optional coordination reservation linkage. |
| 171 | |
| 172 | Returns: |
| 173 | A fully populated :class:`OpEntry`. |
| 174 | """ |
| 175 | return OpEntry( |
| 176 | op_id=str(_uuid_mod.uuid4()), |
| 177 | actor_id=actor_id, |
| 178 | lamport_ts=lamport_ts, |
| 179 | parent_op_ids=list(parent_op_ids or []), |
| 180 | domain=domain, |
| 181 | domain_op=domain_op, |
| 182 | created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| 183 | intent_id=intent_id, |
| 184 | reservation_id=reservation_id, |
| 185 | ) |
| 186 | |
| 187 | |
| 188 | # --------------------------------------------------------------------------- |
| 189 | # OpLog |
| 190 | # --------------------------------------------------------------------------- |
| 191 | |
| 192 | |
| 193 | class OpLog: |
| 194 | """Append-only operation log for a single live collaboration session. |
| 195 | |
| 196 | Each session gets its own directory under ``.muse/op_log/<session_id>/``. |
| 197 | The log file is JSON-lines: one :class:`OpEntry` per line. The checkpoint |
| 198 | file is a single JSON object written atomically when a session is committed. |
| 199 | |
| 200 | Args: |
| 201 | repo_root: Repository root (the directory containing ``.muse/``). |
| 202 | session_id: Stable identifier for this collaboration session. Use a |
| 203 | UUID, a branch name, or any stable string. The session |
| 204 | directory is created on first :meth:`append`. |
| 205 | """ |
| 206 | |
| 207 | def __init__(self, repo_root: pathlib.Path, session_id: str) -> None: |
| 208 | self._repo_root = repo_root |
| 209 | self._session_id = session_id |
| 210 | self._session_dir = repo_root / _OP_LOG_DIR / session_id |
| 211 | self._ops_path = self._session_dir / "ops.jsonl" |
| 212 | self._checkpoint_path = self._session_dir / "checkpoint.json" |
| 213 | self._lamport: int = 0 |
| 214 | |
| 215 | # ------------------------------------------------------------------ |
| 216 | # Internal helpers |
| 217 | # ------------------------------------------------------------------ |
| 218 | |
| 219 | def _ensure_dir(self) -> None: |
| 220 | self._session_dir.mkdir(parents=True, exist_ok=True) |
| 221 | |
| 222 | def _load_lamport(self) -> int: |
| 223 | """Return the highest lamport_ts seen in the log so far.""" |
| 224 | if not self._ops_path.exists(): |
| 225 | return 0 |
| 226 | highest = 0 |
| 227 | with self._ops_path.open() as fh: |
| 228 | for line in fh: |
| 229 | line = line.strip() |
| 230 | if not line: |
| 231 | continue |
| 232 | try: |
| 233 | entry: OpEntry = json.loads(line) |
| 234 | highest = max(highest, entry.get("lamport_ts", 0)) |
| 235 | except json.JSONDecodeError: |
| 236 | continue |
| 237 | return highest |
| 238 | |
| 239 | # ------------------------------------------------------------------ |
| 240 | # Public API |
| 241 | # ------------------------------------------------------------------ |
| 242 | |
| 243 | def next_lamport_ts(self) -> int: |
| 244 | """Return the next Lamport timestamp to use, advancing the counter. |
| 245 | |
| 246 | The counter is initialised lazily from the highest value found in the |
| 247 | log on first call (so that a reopened session continues from where it |
| 248 | left off). |
| 249 | |
| 250 | Returns: |
| 251 | Monotonically increasing integer. |
| 252 | """ |
| 253 | if self._lamport == 0: |
| 254 | self._lamport = self._load_lamport() |
| 255 | self._lamport += 1 |
| 256 | return self._lamport |
| 257 | |
| 258 | def append(self, entry: OpEntry) -> None: |
| 259 | """Append *entry* to the op log. |
| 260 | |
| 261 | The entry is serialised as a single JSON line and flushed to disk. |
| 262 | This is the only write operation on the log file; entries are never |
| 263 | modified or deleted. |
| 264 | |
| 265 | Args: |
| 266 | entry: A fully populated :class:`OpEntry`. |
| 267 | """ |
| 268 | self._ensure_dir() |
| 269 | line = json.dumps(entry, separators=(",", ":")) + "\n" |
| 270 | with self._ops_path.open("a") as fh: |
| 271 | fh.write(line) |
| 272 | logger.debug( |
| 273 | "✅ OpLog append: actor=%r domain=%r ts=%d", |
| 274 | entry["actor_id"], |
| 275 | entry["domain"], |
| 276 | entry["lamport_ts"], |
| 277 | ) |
| 278 | |
| 279 | def read_all(self) -> list[OpEntry]: |
| 280 | """Return all entries in the log, in append order. |
| 281 | |
| 282 | Returns: |
| 283 | List of :class:`OpEntry` dicts, oldest first. |
| 284 | """ |
| 285 | if not self._ops_path.exists(): |
| 286 | return [] |
| 287 | entries: list[OpEntry] = [] |
| 288 | with self._ops_path.open() as fh: |
| 289 | for line in fh: |
| 290 | line = line.strip() |
| 291 | if not line: |
| 292 | continue |
| 293 | try: |
| 294 | entries.append(json.loads(line)) |
| 295 | except json.JSONDecodeError as exc: |
| 296 | logger.warning("⚠️ Corrupt op log line in %s: %s", self._ops_path, exc) |
| 297 | return entries |
| 298 | |
| 299 | def replay_since_checkpoint(self) -> list[OpEntry]: |
| 300 | """Return entries that arrived after the last checkpoint. |
| 301 | |
| 302 | If no checkpoint exists, returns all entries (equivalent to |
| 303 | :meth:`read_all`). |
| 304 | |
| 305 | Returns: |
| 306 | List of :class:`OpEntry` dicts since last checkpoint, oldest first. |
| 307 | """ |
| 308 | checkpoint = self.read_checkpoint() |
| 309 | all_entries = self.read_all() |
| 310 | if checkpoint is None: |
| 311 | return all_entries |
| 312 | cutoff = checkpoint["lamport_ts"] |
| 313 | return [e for e in all_entries if e["lamport_ts"] > cutoff] |
| 314 | |
| 315 | def to_structured_delta(self, domain: str) -> StructuredDelta: |
| 316 | """Collapse all entries since the last checkpoint into a StructuredDelta. |
| 317 | |
| 318 | Ops are ordered by Lamport timestamp. Ops from domains other than |
| 319 | *domain* are filtered out (a session may carry cross-domain ops from |
| 320 | coordinated agents; each domain collapses its own slice). |
| 321 | |
| 322 | Args: |
| 323 | domain: Domain tag to filter by (e.g. ``"midi"``). |
| 324 | |
| 325 | Returns: |
| 326 | A :class:`~muse.domain.StructuredDelta` with the ordered op list |
| 327 | and a simple count summary. |
| 328 | """ |
| 329 | entries = self.replay_since_checkpoint() |
| 330 | entries.sort(key=lambda e: e["lamport_ts"]) |
| 331 | ops = [e["domain_op"] for e in entries if e["domain"] == domain] |
| 332 | |
| 333 | counts: dict[str, int] = {} |
| 334 | for op in ops: |
| 335 | kind = op.get("op", "unknown") |
| 336 | counts[kind] = counts.get(kind, 0) + 1 |
| 337 | parts = [f"{v} {k}" for k, v in sorted(counts.items())] |
| 338 | summary = ", ".join(parts) if parts else "no ops" |
| 339 | |
| 340 | return StructuredDelta(domain=domain, ops=ops, summary=summary) |
| 341 | |
| 342 | def checkpoint(self, snapshot_id: str) -> OpLogCheckpoint: |
| 343 | """Write a checkpoint recording that all current ops are in *snapshot_id*. |
| 344 | |
| 345 | After a checkpoint, :meth:`replay_since_checkpoint` will only return |
| 346 | ops that arrive after this call. The op log file itself is never |
| 347 | truncated — the checkpoint is a logical marker. |
| 348 | |
| 349 | Args: |
| 350 | snapshot_id: The Muse snapshot ID that captured all ops to date. |
| 351 | |
| 352 | Returns: |
| 353 | The written :class:`OpLogCheckpoint`. |
| 354 | """ |
| 355 | all_entries = self.read_all() |
| 356 | highest_ts = max((e["lamport_ts"] for e in all_entries), default=0) |
| 357 | ckpt = OpLogCheckpoint( |
| 358 | session_id=self._session_id, |
| 359 | snapshot_id=snapshot_id, |
| 360 | lamport_ts=highest_ts, |
| 361 | op_count=len(all_entries), |
| 362 | created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| 363 | ) |
| 364 | self._ensure_dir() |
| 365 | self._checkpoint_path.write_text( |
| 366 | json.dumps(ckpt, indent=2) + "\n" |
| 367 | ) |
| 368 | logger.info( |
| 369 | "✅ OpLog checkpoint: session=%r snapshot=%s ts=%d ops=%d", |
| 370 | self._session_id, |
| 371 | snapshot_id[:8], |
| 372 | highest_ts, |
| 373 | len(all_entries), |
| 374 | ) |
| 375 | return ckpt |
| 376 | |
| 377 | def read_checkpoint(self) -> OpLogCheckpoint | None: |
| 378 | """Load the most recent checkpoint, or ``None`` if none exists.""" |
| 379 | if not self._checkpoint_path.exists(): |
| 380 | return None |
| 381 | try: |
| 382 | raw: OpLogCheckpoint = json.loads(self._checkpoint_path.read_text()) |
| 383 | return raw |
| 384 | except (json.JSONDecodeError, KeyError) as exc: |
| 385 | logger.warning("⚠️ Corrupt checkpoint file %s: %s", self._checkpoint_path, exc) |
| 386 | return None |
| 387 | |
| 388 | def session_id(self) -> str: |
| 389 | """Return the session ID for this log.""" |
| 390 | return self._session_id |
| 391 | |
| 392 | |
| 393 | # --------------------------------------------------------------------------- |
| 394 | # Session listing |
| 395 | # --------------------------------------------------------------------------- |
| 396 | |
| 397 | |
| 398 | def list_sessions(repo_root: pathlib.Path) -> list[str]: |
| 399 | """Return all session IDs that have op log directories under *repo_root*. |
| 400 | |
| 401 | Args: |
| 402 | repo_root: Repository root. |
| 403 | |
| 404 | Returns: |
| 405 | Sorted list of session ID strings. |
| 406 | """ |
| 407 | log_dir = repo_root / _OP_LOG_DIR |
| 408 | if not log_dir.exists(): |
| 409 | return [] |
| 410 | return sorted(p.name for p in log_dir.iterdir() if p.is_dir()) |