cgcardona / muse public
op_log.py python
410 lines 14.4 KB
04004b82 Rename MusicRGA → MidiRGA and purge all 'music plugin' terminology Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Append-only operation log for Muse live collaboration.
2
3 The op log is the bridge between real-time collaborative editing and the
4 immutable commit DAG. During a live session, operations are appended to
5 the log as they occur. At commit time the log is collapsed into a
6 :class:`~muse.domain.StructuredDelta` and stored with the commit record.
7
8 Design principles
9 -----------------
10 - **Append-only** — entries are never modified or deleted; the file grows
11 monotonically. Compaction happens through checkpoints (see below).
12 - **Lamport-clocked** — every entry carries a logical Lamport timestamp
13 that imposes a total order across concurrent actors without wall-clock
14 coordination.
15 - **Causally linked** — ``parent_op_ids`` lets any entry declare the ops it
16 depends on, enabling causal replay and CRDT join operations downstream.
17 - **Domain-neutral** — the log stores :class:`~muse.domain.DomainOp` values
18 unchanged; the core engine has no opinion about what those ops mean.
19 - **Checkpoint / compaction** — when a live session crystallises into a Muse
20 commit, a checkpoint record is written that marks the current snapshot.
21 Subsequent reads return only ops that arrived after the checkpoint.
22
23 Layout::
24
25 .muse/op_log/<session_id>/
26 ops.jsonl — one JSON line per OpEntry (append-only)
27 checkpoint.json — most recent checkpoint (snapshot_id + lamport_ts)
28
29 Relationship to the commit DAG
30 -------------------------------
31 The op log does **not** replace the commit DAG. It is a staging area:
32
33 live edits → OpLog.append() → ops.jsonl
34 session end → OpLog.checkpoint(snapshot_id) → commit record
35 commit record → normal Muse commit DAG
36
37 Replaying the log from a checkpoint reproduces the snapshot deterministically,
38 giving the same guarantee as re-running ``git apply`` from a patch file.
39
40 Usage::
41
42 from muse.core.op_log import OpLog, make_op_entry
43
44 log = OpLog(repo_root, session_id="session-abc")
45 entry = make_op_entry(
46 actor_id="counterpoint-bot",
47 domain="midi",
48 domain_op=my_insert_op,
49 lamport_ts=log.next_lamport_ts(),
50 )
51 log.append(entry)
52
53 delta = log.to_structured_delta("midi") # collapse for commit
54 ckpt = log.checkpoint(snapshot_id) # crystallise
55 """
56
57 import datetime
58 import json
59 import logging
60 import pathlib
61 import uuid as _uuid_mod
62 from typing import TypedDict
63
64 from muse.domain import DomainOp, StructuredDelta
65
66 logger = logging.getLogger(__name__)
67
68 _OP_LOG_DIR = ".muse/op_log"
69
70
71 # ---------------------------------------------------------------------------
72 # Wire-format TypedDicts
73 # ---------------------------------------------------------------------------
74
75
76 class OpEntry(TypedDict):
77 """A single operation in the append-only op log.
78
79 ``op_id``
80 Stable UUID4 for this entry — used by consumers to deduplicate
81 on replay and by CRDT join to establish causal identity.
82 ``actor_id``
83 The agent or human identity that produced this op.
84 ``lamport_ts``
85 Logical Lamport timestamp. Monotonically increasing within a
86 session; used to establish total ordering when wall-clock times
87 are unavailable or unreliable.
88 ``parent_op_ids``
89 Causal parents — op IDs that this entry depends on. Empty list
90 means this entry has no explicit causal dependency (root entry).
91 Used by CRDT merge and causal replay.
92 ``domain``
93 Domain tag matching the :class:`~muse.domain.MuseDomainPlugin`
94 that produced this op (e.g. ``"midi"``, ``"code"``).
95 ``domain_op``
96 The actual typed domain operation. Stored verbatim.
97 ``created_at``
98 ISO 8601 UTC wall-clock timestamp when the entry was appended.
99 Informational only — use ``lamport_ts`` for ordering.
100 ``intent_id``
101 Links this op to a coordination intent (from
102 :mod:`muse.core.coordination`). Empty string if not applicable.
103 ``reservation_id``
104 Links this op to a coordination reservation. Empty string if not
105 applicable.
106 """
107
108 op_id: str
109 actor_id: str
110 lamport_ts: int
111 parent_op_ids: list[str]
112 domain: str
113 domain_op: DomainOp
114 created_at: str
115 intent_id: str
116 reservation_id: str
117
118
119 class OpLogCheckpoint(TypedDict):
120 """A snapshot of the op log state at commit time.
121
122 Written by :meth:`OpLog.checkpoint` when a live session crystallises
123 into a Muse commit. Subsequent :meth:`OpLog.replay_since_checkpoint`
124 calls return only ops that arrived after this checkpoint.
125
126 ``session_id``
127 The session this checkpoint belongs to.
128 ``snapshot_id``
129 The commit snapshot ID that this checkpoint materialises. All ops
130 up to and including ``lamport_ts`` are captured by this snapshot.
131 ``lamport_ts``
132 The Lamport timestamp of the last op included in this checkpoint.
133 ``op_count``
134 Number of op entries in the log at checkpoint time.
135 ``created_at``
136 ISO 8601 UTC timestamp.
137 """
138
139 session_id: str
140 snapshot_id: str
141 lamport_ts: int
142 op_count: int
143 created_at: str
144
145
146 # ---------------------------------------------------------------------------
147 # Factory
148 # ---------------------------------------------------------------------------
149
150
151 def make_op_entry(
152 actor_id: str,
153 domain: str,
154 domain_op: DomainOp,
155 lamport_ts: int,
156 *,
157 parent_op_ids: list[str] | None = None,
158 intent_id: str = "",
159 reservation_id: str = "",
160 ) -> OpEntry:
161 """Create a new :class:`OpEntry` with a fresh UUID op_id.
162
163 Args:
164 actor_id: Agent or human identity string.
165 domain: Domain tag (e.g. ``"midi"``).
166 domain_op: The typed domain operation to log.
167 lamport_ts: Logical Lamport timestamp for this entry.
168 parent_op_ids: Causal dependencies. Defaults to empty list.
169 intent_id: Optional coordination intent linkage.
170 reservation_id: Optional coordination reservation linkage.
171
172 Returns:
173 A fully populated :class:`OpEntry`.
174 """
175 return OpEntry(
176 op_id=str(_uuid_mod.uuid4()),
177 actor_id=actor_id,
178 lamport_ts=lamport_ts,
179 parent_op_ids=list(parent_op_ids or []),
180 domain=domain,
181 domain_op=domain_op,
182 created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(),
183 intent_id=intent_id,
184 reservation_id=reservation_id,
185 )
186
187
188 # ---------------------------------------------------------------------------
189 # OpLog
190 # ---------------------------------------------------------------------------
191
192
193 class OpLog:
194 """Append-only operation log for a single live collaboration session.
195
196 Each session gets its own directory under ``.muse/op_log/<session_id>/``.
197 The log file is JSON-lines: one :class:`OpEntry` per line. The checkpoint
198 file is a single JSON object written atomically when a session is committed.
199
200 Args:
201 repo_root: Repository root (the directory containing ``.muse/``).
202 session_id: Stable identifier for this collaboration session. Use a
203 UUID, a branch name, or any stable string. The session
204 directory is created on first :meth:`append`.
205 """
206
207 def __init__(self, repo_root: pathlib.Path, session_id: str) -> None:
208 self._repo_root = repo_root
209 self._session_id = session_id
210 self._session_dir = repo_root / _OP_LOG_DIR / session_id
211 self._ops_path = self._session_dir / "ops.jsonl"
212 self._checkpoint_path = self._session_dir / "checkpoint.json"
213 self._lamport: int = 0
214
215 # ------------------------------------------------------------------
216 # Internal helpers
217 # ------------------------------------------------------------------
218
219 def _ensure_dir(self) -> None:
220 self._session_dir.mkdir(parents=True, exist_ok=True)
221
222 def _load_lamport(self) -> int:
223 """Return the highest lamport_ts seen in the log so far."""
224 if not self._ops_path.exists():
225 return 0
226 highest = 0
227 with self._ops_path.open() as fh:
228 for line in fh:
229 line = line.strip()
230 if not line:
231 continue
232 try:
233 entry: OpEntry = json.loads(line)
234 highest = max(highest, entry.get("lamport_ts", 0))
235 except json.JSONDecodeError:
236 continue
237 return highest
238
239 # ------------------------------------------------------------------
240 # Public API
241 # ------------------------------------------------------------------
242
243 def next_lamport_ts(self) -> int:
244 """Return the next Lamport timestamp to use, advancing the counter.
245
246 The counter is initialised lazily from the highest value found in the
247 log on first call (so that a reopened session continues from where it
248 left off).
249
250 Returns:
251 Monotonically increasing integer.
252 """
253 if self._lamport == 0:
254 self._lamport = self._load_lamport()
255 self._lamport += 1
256 return self._lamport
257
258 def append(self, entry: OpEntry) -> None:
259 """Append *entry* to the op log.
260
261 The entry is serialised as a single JSON line and flushed to disk.
262 This is the only write operation on the log file; entries are never
263 modified or deleted.
264
265 Args:
266 entry: A fully populated :class:`OpEntry`.
267 """
268 self._ensure_dir()
269 line = json.dumps(entry, separators=(",", ":")) + "\n"
270 with self._ops_path.open("a") as fh:
271 fh.write(line)
272 logger.debug(
273 "✅ OpLog append: actor=%r domain=%r ts=%d",
274 entry["actor_id"],
275 entry["domain"],
276 entry["lamport_ts"],
277 )
278
279 def read_all(self) -> list[OpEntry]:
280 """Return all entries in the log, in append order.
281
282 Returns:
283 List of :class:`OpEntry` dicts, oldest first.
284 """
285 if not self._ops_path.exists():
286 return []
287 entries: list[OpEntry] = []
288 with self._ops_path.open() as fh:
289 for line in fh:
290 line = line.strip()
291 if not line:
292 continue
293 try:
294 entries.append(json.loads(line))
295 except json.JSONDecodeError as exc:
296 logger.warning("⚠️ Corrupt op log line in %s: %s", self._ops_path, exc)
297 return entries
298
299 def replay_since_checkpoint(self) -> list[OpEntry]:
300 """Return entries that arrived after the last checkpoint.
301
302 If no checkpoint exists, returns all entries (equivalent to
303 :meth:`read_all`).
304
305 Returns:
306 List of :class:`OpEntry` dicts since last checkpoint, oldest first.
307 """
308 checkpoint = self.read_checkpoint()
309 all_entries = self.read_all()
310 if checkpoint is None:
311 return all_entries
312 cutoff = checkpoint["lamport_ts"]
313 return [e for e in all_entries if e["lamport_ts"] > cutoff]
314
315 def to_structured_delta(self, domain: str) -> StructuredDelta:
316 """Collapse all entries since the last checkpoint into a StructuredDelta.
317
318 Ops are ordered by Lamport timestamp. Ops from domains other than
319 *domain* are filtered out (a session may carry cross-domain ops from
320 coordinated agents; each domain collapses its own slice).
321
322 Args:
323 domain: Domain tag to filter by (e.g. ``"midi"``).
324
325 Returns:
326 A :class:`~muse.domain.StructuredDelta` with the ordered op list
327 and a simple count summary.
328 """
329 entries = self.replay_since_checkpoint()
330 entries.sort(key=lambda e: e["lamport_ts"])
331 ops = [e["domain_op"] for e in entries if e["domain"] == domain]
332
333 counts: dict[str, int] = {}
334 for op in ops:
335 kind = op.get("op", "unknown")
336 counts[kind] = counts.get(kind, 0) + 1
337 parts = [f"{v} {k}" for k, v in sorted(counts.items())]
338 summary = ", ".join(parts) if parts else "no ops"
339
340 return StructuredDelta(domain=domain, ops=ops, summary=summary)
341
342 def checkpoint(self, snapshot_id: str) -> OpLogCheckpoint:
343 """Write a checkpoint recording that all current ops are in *snapshot_id*.
344
345 After a checkpoint, :meth:`replay_since_checkpoint` will only return
346 ops that arrive after this call. The op log file itself is never
347 truncated — the checkpoint is a logical marker.
348
349 Args:
350 snapshot_id: The Muse snapshot ID that captured all ops to date.
351
352 Returns:
353 The written :class:`OpLogCheckpoint`.
354 """
355 all_entries = self.read_all()
356 highest_ts = max((e["lamport_ts"] for e in all_entries), default=0)
357 ckpt = OpLogCheckpoint(
358 session_id=self._session_id,
359 snapshot_id=snapshot_id,
360 lamport_ts=highest_ts,
361 op_count=len(all_entries),
362 created_at=datetime.datetime.now(datetime.timezone.utc).isoformat(),
363 )
364 self._ensure_dir()
365 self._checkpoint_path.write_text(
366 json.dumps(ckpt, indent=2) + "\n"
367 )
368 logger.info(
369 "✅ OpLog checkpoint: session=%r snapshot=%s ts=%d ops=%d",
370 self._session_id,
371 snapshot_id[:8],
372 highest_ts,
373 len(all_entries),
374 )
375 return ckpt
376
377 def read_checkpoint(self) -> OpLogCheckpoint | None:
378 """Load the most recent checkpoint, or ``None`` if none exists."""
379 if not self._checkpoint_path.exists():
380 return None
381 try:
382 raw: OpLogCheckpoint = json.loads(self._checkpoint_path.read_text())
383 return raw
384 except (json.JSONDecodeError, KeyError) as exc:
385 logger.warning("⚠️ Corrupt checkpoint file %s: %s", self._checkpoint_path, exc)
386 return None
387
388 def session_id(self) -> str:
389 """Return the session ID for this log."""
390 return self._session_id
391
392
393 # ---------------------------------------------------------------------------
394 # Session listing
395 # ---------------------------------------------------------------------------
396
397
398 def list_sessions(repo_root: pathlib.Path) -> list[str]:
399 """Return all session IDs that have op log directories under *repo_root*.
400
401 Args:
402 repo_root: Repository root.
403
404 Returns:
405 Sorted list of session ID strings.
406 """
407 log_dir = repo_root / _OP_LOG_DIR
408 if not log_dir.exists():
409 return []
410 return sorted(p.name for p in log_dir.iterdir() if p.is_dir())