cgcardona / muse public
_op_log_integration.py python
118 lines 3.7 KB
bda49bdb feat: redesign .museignore as TOML with domain-scoped sections (#100) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Op-log integration for the code domain.
2
3 When the code plugin computes a structured delta (during ``muse commit``),
4 the individual symbol-level operations can be recorded in the append-only op
5 log for real-time replay. This enables:
6
7 - Multiple agents to watch a live op stream as code changes are committed.
8 - Post-hoc causal ordering of concurrent edits.
9 - Replay of changes at any point in history with Lamport-clock ordering.
10
11 Usage
12 -----
13 Call :func:`record_delta_ops` after the code plugin computes its delta::
14
15 from muse.plugins.code._op_log_integration import record_delta_ops
16
17 delta = plugin.diff(base, target, repo_root=root)
18 record_delta_ops(root, delta, session_id="my-session", actor_id="agent-x")
19
20 The ops are then visible via ``muse.core.op_log.OpLog(root, session_id).read_all()``.
21
22 Public API
23 ----------
24 - :func:`record_delta_ops` — write a ``StructuredDelta``'s ops to the op log.
25 - :func:`open_code_session` — open (or create) a named op log session.
26 """
27
28 from __future__ import annotations
29
30 import logging
31 import pathlib
32
33 from muse.core.op_log import OpLog, make_op_entry
34 from muse.domain import DomainOp, StructuredDelta
35
36 logger = logging.getLogger(__name__)
37
38 _DOMAIN = "code"
39
40
41 def open_code_session(
42 repo_root: pathlib.Path,
43 session_id: str,
44 ) -> OpLog:
45 """Open (or create) an op log session for code domain recording.
46
47 Args:
48 repo_root: Repository root.
49 session_id: Stable session identifier (e.g. branch name or UUID).
50
51 Returns:
52 An :class:`~muse.core.op_log.OpLog` instance ready for appending.
53 """
54 return OpLog(repo_root, session_id)
55
56
57 def record_delta_ops(
58 repo_root: pathlib.Path,
59 delta: StructuredDelta,
60 *,
61 session_id: str,
62 actor_id: str,
63 parent_op_ids: list[str] | None = None,
64 ) -> list[str]:
65 """Write a ``StructuredDelta``'s ops into the append-only op log.
66
67 Each top-level op in *delta* becomes one :class:`~muse.core.op_log.OpEntry`.
68 Child ops (from ``PatchOp.child_ops``) are also appended, each with the
69 parent op's ID as their causal parent.
70
71 Args:
72 repo_root: Repository root.
73 delta: The structured delta to record.
74 session_id: Session to append to (created if it doesn't exist).
75 actor_id: Agent or human who produced the delta.
76 parent_op_ids: Causal parent op IDs (from a prior checkpoint).
77
78 Returns:
79 List of op IDs appended to the log (for use as causal parents
80 in subsequent calls).
81 """
82 log = OpLog(repo_root, session_id)
83 appended_ids: list[str] = []
84 causal_parents = list(parent_op_ids or [])
85
86 # Use the log's current size as the starting Lamport timestamp.
87 lamport = len(log.read_all())
88
89 for op in delta.get("ops", []):
90 entry = make_op_entry(
91 actor_id=actor_id,
92 domain=_DOMAIN,
93 domain_op=op,
94 lamport_ts=lamport,
95 parent_op_ids=causal_parents,
96 )
97 log.append(entry)
98 op_id: str = entry["op_id"]
99 appended_ids.append(op_id)
100 lamport += 1
101
102 # Record child ops (symbol-level changes within a file — PatchOp only).
103 child_ops: list[DomainOp] = []
104 if op.get("op") == "patch" and op["op"] == "patch":
105 child_ops = op["child_ops"]
106 for child_op in child_ops:
107 child_entry = make_op_entry(
108 actor_id=actor_id,
109 domain=_DOMAIN,
110 domain_op=child_op,
111 lamport_ts=lamport,
112 parent_op_ids=[op_id], # causal child of the file op
113 )
114 log.append(child_entry)
115 appended_ids.append(child_entry["op_id"])
116 lamport += 1
117
118 return appended_ids