cgcardona / muse public
test_core_merge_engine.py python
321 lines 11.8 KB
53d2d9ce feat(phase-3): operation-level merge engine — OT-based auto-merge for n… Gabriel Cardona <cgcardona@gmail.com> 2d ago
1 """Tests for muse.core.merge_engine — three-way merge logic.
2
3 Extended in Phase 3 to cover the structured (operation-level) merge path via
4 :func:`~muse.core.op_transform.merge_structured` and the
5 :class:`~muse.domain.StructuredMergePlugin` integration.
6 """
7 from __future__ import annotations
8
9 import datetime
10 import json
11 import pathlib
12
13 import pytest
14
15 from muse.core.merge_engine import (
16 MergeState,
17 apply_merge,
18 clear_merge_state,
19 detect_conflicts,
20 diff_snapshots,
21 find_merge_base,
22 read_merge_state,
23 write_merge_state,
24 )
25 from muse.core.op_transform import MergeOpsResult, merge_op_lists, merge_structured
26 from muse.core.store import CommitRecord, write_commit
27 from muse.domain import (
28 DeleteOp,
29 DomainOp,
30 InsertOp,
31 ReplaceOp,
32 SnapshotManifest,
33 StructuredDelta,
34 StructuredMergePlugin,
35 )
36 from muse.plugins.music.plugin import MusicPlugin
37
38
39 @pytest.fixture
40 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
41 muse_dir = tmp_path / ".muse"
42 (muse_dir / "commits").mkdir(parents=True)
43 (muse_dir / "refs" / "heads").mkdir(parents=True)
44 return tmp_path
45
46
47 def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None:
48 write_commit(root, CommitRecord(
49 commit_id=cid,
50 repo_id="r",
51 branch="main",
52 snapshot_id=f"snap-{cid}",
53 message=cid,
54 committed_at=datetime.datetime.now(datetime.timezone.utc),
55 parent_commit_id=parent,
56 parent2_commit_id=parent2,
57 ))
58
59
60 class TestDiffSnapshots:
61 def test_no_change(self) -> None:
62 m = {"a.mid": "h1", "b.mid": "h2"}
63 assert diff_snapshots(m, m) == set()
64
65 def test_added(self) -> None:
66 assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"}
67
68 def test_removed(self) -> None:
69 assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"}
70
71 def test_modified(self) -> None:
72 assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"}
73
74
75 class TestDetectConflicts:
76 def test_no_conflict(self) -> None:
77 assert detect_conflicts({"a.mid"}, {"b.mid"}) == set()
78
79 def test_conflict(self) -> None:
80 assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"}
81
82 def test_both_empty(self) -> None:
83 assert detect_conflicts(set(), set()) == set()
84
85
86 class TestApplyMerge:
87 def test_clean_merge(self) -> None:
88 base = {"a.mid": "h0", "b.mid": "h0"}
89 ours = {"a.mid": "h_ours", "b.mid": "h0"}
90 theirs = {"a.mid": "h0", "b.mid": "h_theirs"}
91 ours_changed = {"a.mid"}
92 theirs_changed = {"b.mid"}
93 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
94 assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"}
95
96 def test_conflict_paths_excluded(self) -> None:
97 base = {"a.mid": "h0"}
98 ours = {"a.mid": "h_ours"}
99 theirs = {"a.mid": "h_theirs"}
100 ours_changed = theirs_changed = {"a.mid"}
101 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"})
102 assert result == {"a.mid": "h0"} # Falls back to base
103
104 def test_ours_deletion_applied(self) -> None:
105 base = {"a.mid": "h0", "b.mid": "h0"}
106 ours = {"b.mid": "h0"} # a.mid deleted on ours
107 theirs = {"a.mid": "h0", "b.mid": "h0"}
108 result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set())
109 assert "a.mid" not in result
110
111
112 class TestMergeStateIO:
113 def test_write_and_read(self, repo: pathlib.Path) -> None:
114 write_merge_state(
115 repo,
116 base_commit="base",
117 ours_commit="ours",
118 theirs_commit="theirs",
119 conflict_paths=["a.mid", "b.mid"],
120 other_branch="feature/x",
121 )
122 state = read_merge_state(repo)
123 assert state is not None
124 assert state.base_commit == "base"
125 assert state.conflict_paths == ["a.mid", "b.mid"]
126 assert state.other_branch == "feature/x"
127
128 def test_read_no_state(self, repo: pathlib.Path) -> None:
129 assert read_merge_state(repo) is None
130
131 def test_clear(self, repo: pathlib.Path) -> None:
132 write_merge_state(repo, base_commit="b", ours_commit="o", theirs_commit="t", conflict_paths=[])
133 clear_merge_state(repo)
134 assert read_merge_state(repo) is None
135
136
137 class TestFindMergeBase:
138 def test_direct_parent(self, repo: pathlib.Path) -> None:
139 _commit(repo, "root")
140 _commit(repo, "a", parent="root")
141 _commit(repo, "b", parent="root")
142 base = find_merge_base(repo, "a", "b")
143 assert base == "root"
144
145 def test_same_commit(self, repo: pathlib.Path) -> None:
146 _commit(repo, "root")
147 base = find_merge_base(repo, "root", "root")
148 assert base == "root"
149
150 def test_linear_history(self, repo: pathlib.Path) -> None:
151 _commit(repo, "a")
152 _commit(repo, "b", parent="a")
153 _commit(repo, "c", parent="b")
154 base = find_merge_base(repo, "c", "b")
155 assert base == "b"
156
157 def test_no_common_ancestor(self, repo: pathlib.Path) -> None:
158 _commit(repo, "x")
159 _commit(repo, "y")
160 assert find_merge_base(repo, "x", "y") is None
161
162
163 # ===========================================================================
164 # Phase 3 — structured merge engine integration tests
165 # ===========================================================================
166
167
168 def _ins(addr: str, pos: int | None, cid: str) -> InsertOp:
169 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
170
171
172 def _del(addr: str, pos: int | None, cid: str) -> DeleteOp:
173 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
174
175
176 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
177 return ReplaceOp(
178 op="replace",
179 address=addr,
180 position=None,
181 old_content_id=old,
182 new_content_id=new,
183 old_summary="old",
184 new_summary="new",
185 )
186
187
188 def _delta(ops: list[DomainOp]) -> StructuredDelta:
189 return StructuredDelta(domain="music", ops=ops, summary="test")
190
191
192 class TestMergeStructuredIntegration:
193 """Verify merge_structured delegates correctly to merge_op_lists."""
194
195 def test_clean_non_overlapping_file_ops(self) -> None:
196 ours = _delta([_ins("a.mid", pos=0, cid="a-hash")])
197 theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")])
198 result = merge_structured(_delta([]), ours, theirs)
199 assert result.is_clean is True
200 assert len(result.merged_ops) == 2
201
202 def test_conflicting_same_address_replaces_detected(self) -> None:
203 ours = _delta([_rep("shared.mid", "old", "v-ours")])
204 theirs = _delta([_rep("shared.mid", "old", "v-theirs")])
205 result = merge_structured(_delta([]), ours, theirs)
206 assert result.is_clean is False
207 assert len(result.conflict_ops) == 1
208
209 def test_base_ops_kept_by_both_sides_preserved(self) -> None:
210 shared = _ins("base.mid", pos=0, cid="base-cid")
211 result = merge_structured(
212 _delta([shared]),
213 _delta([shared]),
214 _delta([shared]),
215 )
216 assert result.is_clean is True
217 assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops)
218
219 def test_position_adjustment_in_structured_merge(self) -> None:
220 """Non-conflicting note inserts get position-adjusted in structured merge."""
221 ours = _delta([_ins("lead.mid", pos=3, cid="note-A")])
222 theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")])
223 result = merge_structured(_delta([]), ours, theirs)
224 assert result.is_clean is True
225 pos_by_cid = {
226 op["content_id"]: op["position"]
227 for op in result.merged_ops
228 if op["op"] == "insert"
229 }
230 # note-A(3): no theirs ≤ 3 → stays 3
231 assert pos_by_cid["note-A"] == 3
232 # note-B(7): ours A(3) ≤ 7 → 7+1 = 8
233 assert pos_by_cid["note-B"] == 8
234
235
236 def _op_key_tuple(op: DomainOp) -> tuple[str, ...]:
237 """Re-implementation of _op_key for test assertions."""
238 if op["op"] == "insert":
239 return ("insert", op["address"], str(op["position"]), op["content_id"])
240 if op["op"] == "delete":
241 return ("delete", op["address"], str(op["position"]), op["content_id"])
242 if op["op"] == "replace":
243 return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"])
244 return (op["op"], op["address"])
245
246
247 class TestStructuredMergePluginProtocol:
248 """Verify MusicPlugin satisfies the StructuredMergePlugin protocol."""
249
250 def test_music_plugin_isinstance_structured_merge_plugin(self) -> None:
251 plugin = MusicPlugin()
252 assert isinstance(plugin, StructuredMergePlugin)
253
254 def test_merge_ops_non_conflicting_files_is_clean(self) -> None:
255 plugin = MusicPlugin()
256 base = SnapshotManifest(files={}, domain="music")
257 ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="music")
258 theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="music")
259 ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")]
260 theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")]
261
262 result = plugin.merge_ops(
263 base, ours_snap, theirs_snap, ours_ops, theirs_ops
264 )
265 assert result.is_clean is True
266 assert "a.mid" in result.merged["files"]
267 assert "b.mid" in result.merged["files"]
268
269 def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None:
270 plugin = MusicPlugin()
271 base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="music")
272 ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="music")
273 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="music")
274 ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")]
275 theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")]
276
277 result = plugin.merge_ops(
278 base, ours_snap, theirs_snap, ours_ops, theirs_ops
279 )
280 assert not result.is_clean
281 assert "f.mid" in result.conflicts
282
283 def test_merge_ops_ours_strategy_resolves_conflict(self) -> None:
284 plugin = MusicPlugin()
285 base = SnapshotManifest(files={"f.mid": "base"}, domain="music")
286 ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="music")
287 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="music")
288 ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")]
289 theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")]
290
291 result = plugin.merge_ops(
292 base,
293 ours_snap,
294 theirs_snap,
295 ours_ops,
296 theirs_ops,
297 )
298 # Without .museattributes the conflict stands — verify conflict is reported.
299 assert not result.is_clean
300
301 def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None:
302 plugin = MusicPlugin()
303 base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="music")
304 ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="music")
305 theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="music")
306 ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")]
307 theirs_ops: list[DomainOp] = []
308
309 result = plugin.merge_ops(
310 base, ours_snap, theirs_snap, ours_ops, theirs_ops
311 )
312 assert result.is_clean is True
313 assert "keep.mid" in result.merged["files"]
314 assert "remove.mid" not in result.merged["files"]
315
316 def test_merge_ops_empty_changes_returns_base(self) -> None:
317 plugin = MusicPlugin()
318 base = SnapshotManifest(files={"f.mid": "h"}, domain="music")
319 result = plugin.merge_ops(base, base, base, [], [])
320 assert result.is_clean is True
321 assert result.merged["files"] == {"f.mid": "h"}