cgcardona / muse public
test_op_transform.py python
627 lines 24.3 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for the operation-level merge engine.
2
3 Covers every commutativity rule from the spec table, the OT transform function,
4 and the full three-way ``merge_op_lists`` algorithm. Each test is named after
5 the specific behaviour it verifies so that a failure message is self-documenting.
6 """
7
8 import pytest
9
10 from muse.core.op_transform import (
11 MergeOpsResult,
12 _adjust_insert_positions,
13 _op_key,
14 merge_op_lists,
15 merge_structured,
16 ops_commute,
17 transform,
18 )
19 from muse.domain import (
20 DeleteOp,
21 DomainOp,
22 InsertOp,
23 MoveOp,
24 PatchOp,
25 ReplaceOp,
26 StructuredDelta,
27 )
28
29
30 # ---------------------------------------------------------------------------
31 # Helpers for building typed ops
32 # ---------------------------------------------------------------------------
33
34
35 def _ins(addr: str, pos: int | None, cid: str = "cid-a") -> InsertOp:
36 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
37
38
39 def _del(addr: str, pos: int | None, cid: str = "cid-a") -> DeleteOp:
40 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
41
42
43 def _mov(addr: str, from_pos: int, to_pos: int, cid: str = "cid-a") -> MoveOp:
44 return MoveOp(op="move", address=addr, from_position=from_pos, to_position=to_pos, content_id=cid)
45
46
47 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
48 return ReplaceOp(
49 op="replace",
50 address=addr,
51 position=None,
52 old_content_id=old,
53 new_content_id=new,
54 old_summary="old",
55 new_summary="new",
56 )
57
58
59 def _patch(addr: str, child_ops: list[DomainOp] | None = None) -> PatchOp:
60 return PatchOp(
61 op="patch",
62 address=addr,
63 child_ops=child_ops or [],
64 child_domain="test",
65 child_summary="test patch",
66 )
67
68
69 def _delta(ops: list[DomainOp], *, domain: str = "midi") -> StructuredDelta:
70 return StructuredDelta(domain=domain, ops=ops, summary="test")
71
72
73 # ===========================================================================
74 # Part 1 — ops_commute: commutativity oracle
75 # ===========================================================================
76
77
78 class TestOpsCommuteInserts:
79 def test_inserts_at_different_positions_commute(self) -> None:
80 a = _ins("f.mid", pos=2)
81 b = _ins("f.mid", pos=5)
82 assert ops_commute(a, b) is True
83
84 def test_inserts_at_same_position_do_not_commute(self) -> None:
85 a = _ins("f.mid", pos=3)
86 b = _ins("f.mid", pos=3)
87 assert ops_commute(a, b) is False
88
89 def test_inserts_with_none_position_commute_unordered(self) -> None:
90 a = _ins("files/", pos=None, cid="aa")
91 b = _ins("files/", pos=None, cid="bb")
92 assert ops_commute(a, b) is True
93
94 def test_inserts_with_none_and_int_position_commute_unordered(self) -> None:
95 # If either side is unordered, treat as commuting.
96 a = _ins("files/", pos=None)
97 b = _ins("files/", pos=3)
98 assert ops_commute(a, b) is True
99
100 def test_inserts_at_different_addresses_commute(self) -> None:
101 a = _ins("a.mid", pos=0)
102 b = _ins("b.mid", pos=0)
103 assert ops_commute(a, b) is True
104
105
106 class TestOpsCommuteDeletes:
107 def test_deletes_at_different_addresses_commute(self) -> None:
108 assert ops_commute(_del("a.mid", 0), _del("b.mid", 0)) is True
109
110 def test_consensus_delete_same_address_commutes(self) -> None:
111 # Both branches deleted the same file — idempotent, not a conflict.
112 a = _del("f.mid", pos=0, cid="same")
113 b = _del("f.mid", pos=0, cid="same")
114 assert ops_commute(a, b) is True
115
116 def test_deletes_at_same_address_different_content_still_commute(self) -> None:
117 # Two deletes always commute — the result is "both deleted something".
118 a = _del("f.mid", pos=1, cid="c1")
119 b = _del("f.mid", pos=2, cid="c2")
120 assert ops_commute(a, b) is True
121
122
123 class TestOpsCommuteReplaces:
124 def test_replaces_at_different_addresses_commute(self) -> None:
125 assert ops_commute(_rep("a.mid", "o", "n"), _rep("b.mid", "o", "n")) is True
126
127 def test_replaces_at_same_address_do_not_commute(self) -> None:
128 assert ops_commute(_rep("f.mid", "old", "v1"), _rep("f.mid", "old", "v2")) is False
129
130
131 class TestOpsCommuteMoves:
132 def test_moves_from_different_positions_commute(self) -> None:
133 assert ops_commute(_mov("f.mid", 2, 5), _mov("f.mid", 7, 1)) is True
134
135 def test_moves_from_same_position_do_not_commute(self) -> None:
136 assert ops_commute(_mov("f.mid", 3, 0), _mov("f.mid", 3, 9)) is False
137
138 def test_move_and_delete_same_position_do_not_commute(self) -> None:
139 move = _mov("f.mid", 5, 9)
140 delete = _del("f.mid", pos=5)
141 assert ops_commute(move, delete) is False
142
143 def test_move_and_delete_different_positions_commute(self) -> None:
144 move = _mov("f.mid", 5, 9)
145 delete = _del("f.mid", pos=2)
146 assert ops_commute(move, delete) is True
147
148 def test_delete_and_move_same_position_is_symmetric(self) -> None:
149 move = _mov("f.mid", 5, 9)
150 delete = _del("f.mid", pos=5)
151 # commute(move, delete) == commute(delete, move)
152 assert ops_commute(delete, move) is False
153
154 def test_delete_with_none_position_and_move_commute(self) -> None:
155 move = _mov("f.mid", 5, 9)
156 delete = _del("files/", pos=None)
157 assert ops_commute(move, delete) is True
158
159
160 class TestOpsCommutePatches:
161 def test_patches_at_different_addresses_commute(self) -> None:
162 a = _patch("a.mid")
163 b = _patch("b.mid")
164 assert ops_commute(a, b) is True
165
166 def test_patch_at_same_address_with_non_conflicting_children_commutes(self) -> None:
167 child_a = _ins("note:0", pos=1)
168 child_b = _ins("note:0", pos=5)
169 a = _patch("f.mid", child_ops=[child_a])
170 b = _patch("f.mid", child_ops=[child_b])
171 assert ops_commute(a, b) is True
172
173 def test_patch_at_same_address_with_conflicting_children_does_not_commute(self) -> None:
174 child_a = _rep("note:0", "old", "v1")
175 child_b = _rep("note:0", "old", "v2")
176 a = _patch("f.mid", child_ops=[child_a])
177 b = _patch("f.mid", child_ops=[child_b])
178 assert ops_commute(a, b) is False
179
180 def test_empty_patch_children_always_commute(self) -> None:
181 a = _patch("f.mid", child_ops=[])
182 b = _patch("f.mid", child_ops=[])
183 assert ops_commute(a, b) is True
184
185
186 class TestOpsCommuteMixedTypes:
187 def test_insert_and_delete_at_different_addresses_commute(self) -> None:
188 assert ops_commute(_ins("a.mid", 0), _del("b.mid", 0)) is True
189
190 def test_insert_and_delete_at_same_address_do_not_commute(self) -> None:
191 assert ops_commute(_ins("f.mid", 2), _del("f.mid", 5)) is False
192
193 def test_delete_and_insert_symmetry(self) -> None:
194 a = _ins("f.mid", 2)
195 b = _del("f.mid", 5)
196 assert ops_commute(a, b) == ops_commute(b, a)
197
198 def test_replace_and_insert_at_different_addresses_commute(self) -> None:
199 assert ops_commute(_rep("a.mid", "o", "n"), _ins("b.mid", 0)) is True
200
201 def test_replace_and_insert_at_same_address_do_not_commute(self) -> None:
202 assert ops_commute(_rep("f.mid", "o", "n"), _ins("f.mid", 0)) is False
203
204 def test_patch_and_replace_at_different_addresses_commute(self) -> None:
205 assert ops_commute(_patch("a.mid"), _rep("b.mid", "o", "n")) is True
206
207 def test_patch_and_replace_at_same_address_do_not_commute(self) -> None:
208 assert ops_commute(_patch("f.mid"), _rep("f.mid", "o", "n")) is False
209
210
211 # ===========================================================================
212 # Part 2 — transform: position adjustment for commuting ops
213 # ===========================================================================
214
215
216 class TestTransform:
217 def test_insert_before_insert_shifts_later_op(self) -> None:
218 # a inserts at pos 2, b inserts at pos 5. a < b, so b' = 6.
219 a = _ins("f.mid", pos=2, cid="a")
220 b = _ins("f.mid", pos=5, cid="b")
221 a_prime, b_prime = transform(a, b)
222 assert a_prime["position"] == 2 # unchanged
223 assert b_prime["position"] == 6 # shifted by a
224
225 def test_insert_after_insert_shifts_earlier_op(self) -> None:
226 # a inserts at pos 7, b inserts at pos 3. a > b, so a' = 8.
227 a = _ins("f.mid", pos=7, cid="a")
228 b = _ins("f.mid", pos=3, cid="b")
229 a_prime, b_prime = transform(a, b)
230 assert a_prime["position"] == 8 # shifted by b
231 assert b_prime["position"] == 3 # unchanged
232
233 def test_transform_preserves_content_id(self) -> None:
234 a = _ins("f.mid", pos=1, cid="note-a")
235 b = _ins("f.mid", pos=10, cid="note-b")
236 a_prime, b_prime = transform(a, b)
237 assert a_prime["content_id"] == "note-a"
238 assert b_prime["content_id"] == "note-b"
239
240 def test_transform_unordered_inserts_identity(self) -> None:
241 # position=None → identity transform (unordered collection).
242 a = _ins("files/", pos=None, cid="a")
243 b = _ins("files/", pos=None, cid="b")
244 a_prime, b_prime = transform(a, b)
245 assert a_prime is a
246 assert b_prime is b
247
248 def test_transform_non_insert_ops_identity(self) -> None:
249 # For all other commuting pairs, transform returns identity.
250 a = _del("a.mid", pos=3)
251 b = _del("b.mid", pos=7)
252 a_prime, b_prime = transform(a, b)
253 assert a_prime is a
254 assert b_prime is b
255
256 def test_transform_replace_ops_identity(self) -> None:
257 a = _rep("a.mid", "o", "n")
258 b = _rep("b.mid", "o", "n")
259 a_prime, b_prime = transform(a, b)
260 assert a_prime is a
261 assert b_prime is b
262
263 def test_transform_diamond_property_two_inserts(self) -> None:
264 """Verify that a ∘ b' == b ∘ a' — the fundamental OT diamond property.
265
266 We simulate applying inserts to a sequence and check the final order
267 matches regardless of which is applied first.
268 """
269 # Start with base list indices [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
270 # a = insert 'X' at position 3; b = insert 'Y' at position 7
271 a = _ins("seq", pos=3, cid="X")
272 b = _ins("seq", pos=7, cid="Y")
273 a_prime, b_prime = transform(a, b)
274
275 # Apply a then b': X at 3, Y at 8 → [0,1,2,X,3,4,5,6,7,Y,8,9]
276 seq = list(range(10))
277 a_pos = a["position"]
278 b_prime_pos = b_prime["position"]
279 assert a_pos is not None and b_prime_pos is not None
280 seq.insert(a_pos, "X")
281 seq.insert(b_prime_pos, "Y")
282 path_ab = seq[:]
283
284 # Apply b then a': Y at 7, X at 3 → [0,1,2,X,3,4,5,6,Y,7,8,9]
285 seq2 = list(range(10))
286 b_pos = b["position"]
287 a_prime_pos = a_prime["position"]
288 assert b_pos is not None and a_prime_pos is not None
289 seq2.insert(b_pos, "Y")
290 seq2.insert(a_prime_pos, "X")
291 path_ba = seq2[:]
292
293 assert path_ab == path_ba
294
295
296 # ===========================================================================
297 # Part 3 — _adjust_insert_positions (counting formula)
298 # ===========================================================================
299
300
301 class TestAdjustInsertPositions:
302 def test_no_other_ops_identity(self) -> None:
303 ops = [_ins("f.mid", pos=5, cid="a")]
304 result = _adjust_insert_positions(ops, [])
305 assert result[0]["position"] == 5
306
307 def test_single_other_before_shifts_position(self) -> None:
308 ops = [_ins("f.mid", pos=5, cid="a")]
309 others = [_ins("f.mid", pos=3, cid="x")]
310 result = _adjust_insert_positions(ops, others)
311 assert result[0]["position"] == 6 # shifted by 1
312
313 def test_other_after_does_not_shift(self) -> None:
314 ops = [_ins("f.mid", pos=3, cid="a")]
315 others = [_ins("f.mid", pos=5, cid="x")]
316 result = _adjust_insert_positions(ops, others)
317 assert result[0]["position"] == 3 # unchanged
318
319 def test_multiple_others_all_before_shifts_by_count(self) -> None:
320 ops = [_ins("f.mid", pos=10, cid="a")]
321 others = [_ins("f.mid", pos=2, cid="x"), _ins("f.mid", pos=7, cid="y")]
322 result = _adjust_insert_positions(ops, others)
323 assert result[0]["position"] == 12 # shifted by 2
324
325 def test_mixed_addresses_does_not_cross_contaminate(self) -> None:
326 ops = [_ins("a.mid", pos=3, cid="a")]
327 others = [_ins("b.mid", pos=1, cid="x")] # different address
328 result = _adjust_insert_positions(ops, others)
329 assert result[0]["position"] == 3 # not shifted
330
331 def test_non_insert_ops_pass_through_unchanged(self) -> None:
332 ops: list[DomainOp] = [_del("f.mid", pos=3, cid="x")]
333 result = _adjust_insert_positions(ops, [_ins("f.mid", pos=1, cid="y")])
334 assert result[0] is ops[0]
335
336 def test_unordered_insert_passes_through(self) -> None:
337 ops = [_ins("files/", pos=None, cid="a")]
338 others = [_ins("files/", pos=None, cid="x")]
339 result = _adjust_insert_positions(ops, others)
340 assert result[0]["position"] is None
341
342 def test_concrete_example_four_note_insertions(self) -> None:
343 """Verify counting formula on the four-note example from the spec."""
344 ours = [_ins("f.mid", pos=5, cid="V"), _ins("f.mid", pos=10, cid="W")]
345 theirs = [_ins("f.mid", pos=3, cid="X"), _ins("f.mid", pos=8, cid="Y")]
346
347 ours_adj = _adjust_insert_positions(ours, theirs)
348 theirs_adj = _adjust_insert_positions(theirs, ours)
349
350 # V(5) shifted by X(3) which is ≤ 5: V → 6
351 assert ours_adj[0]["position"] == 6
352 # W(10) shifted by X(3) and Y(8) both ≤ 10: W → 12
353 assert ours_adj[1]["position"] == 12
354 # X(3) no ours inserts ≤ 3: stays 3
355 assert theirs_adj[0]["position"] == 3
356 # Y(8) shifted by V(5) ≤ 8: Y → 9
357 assert theirs_adj[1]["position"] == 9
358
359
360 # ===========================================================================
361 # Part 4 — merge_op_lists: three-way merge
362 # ===========================================================================
363
364
365 class TestMergeOpLists:
366 def test_empty_inputs_return_empty_result(self) -> None:
367 result = merge_op_lists([], [], [])
368 assert result.merged_ops == []
369 assert result.conflict_ops == []
370 assert result.is_clean is True
371
372 def test_ours_only_additions_pass_through(self) -> None:
373 op = _ins("f.mid", pos=2, cid="x")
374 result = merge_op_lists([], [op], [])
375 assert len(result.merged_ops) == 1
376 assert result.conflict_ops == []
377
378 def test_theirs_only_additions_pass_through(self) -> None:
379 op = _del("f.mid", pos=0)
380 result = merge_op_lists([], [], [op])
381 assert len(result.merged_ops) == 1
382 assert result.conflict_ops == []
383
384 def test_non_conflicting_inserts_both_included(self) -> None:
385 ours_op = _ins("f.mid", pos=2, cid="V")
386 theirs_op = _ins("f.mid", pos=5, cid="W")
387 result = merge_op_lists([], [ours_op], [theirs_op])
388 assert result.is_clean is True
389 positions = {op["position"] for op in result.merged_ops if op["op"] == "insert"}
390 # Ours at 2 stays 2 (no theirs ≤ 2); theirs at 5 → 6 (ours at 2 ≤ 5).
391 assert 2 in positions
392 assert 6 in positions
393
394 def test_same_position_insert_produces_conflict(self) -> None:
395 ours_op = _ins("f.mid", pos=3, cid="A")
396 theirs_op = _ins("f.mid", pos=3, cid="B")
397 result = merge_op_lists([], [ours_op], [theirs_op])
398 assert not result.is_clean
399 assert len(result.conflict_ops) == 1
400 assert result.conflict_ops[0][0]["content_id"] == "A"
401 assert result.conflict_ops[0][1]["content_id"] == "B"
402
403 def test_consensus_addition_included_once(self) -> None:
404 op = _ins("f.mid", pos=4, cid="shared")
405 result = merge_op_lists([], [op], [op])
406 # Consensus: both added the same op → include exactly once.
407 assert len(result.merged_ops) == 1
408 assert result.conflict_ops == []
409
410 def test_base_ops_kept_by_both_sides_included(self) -> None:
411 base_op = _ins("f.mid", pos=0, cid="base")
412 # Both sides still have the base op.
413 result = merge_op_lists([base_op], [base_op], [base_op])
414 assert base_op in result.merged_ops
415
416 def test_base_op_deleted_by_ours_not_in_merged(self) -> None:
417 base_op = _ins("f.mid", pos=0, cid="base")
418 # Ours removed it, theirs kept it.
419 result = merge_op_lists([base_op], [], [base_op])
420 # The base op is NOT in kept (ours removed it) and NOT in ours_new
421 # (it was in base). It remains in theirs, so theirs "kept" it.
422 # Only ops in base AND in both branches end up in kept.
423 assert base_op not in result.merged_ops
424
425 def test_replace_conflict_at_same_address(self) -> None:
426 ours_op = _rep("f.mid", "old", "v-ours")
427 theirs_op = _rep("f.mid", "old", "v-theirs")
428 result = merge_op_lists([], [ours_op], [theirs_op])
429 assert not result.is_clean
430 assert len(result.conflict_ops) == 1
431
432 def test_replace_at_different_addresses_no_conflict(self) -> None:
433 ours_op = _rep("a.mid", "old", "new-a")
434 theirs_op = _rep("b.mid", "old", "new-b")
435 result = merge_op_lists([], [ours_op], [theirs_op])
436 assert result.is_clean
437 assert len(result.merged_ops) == 2
438
439 def test_consensus_delete_included_once(self) -> None:
440 del_op = _del("f.mid", pos=2, cid="gone")
441 result = merge_op_lists([], [del_op], [del_op])
442 assert len(result.merged_ops) == 1
443
444 def test_note_level_multi_insert_positions_adjusted_correctly(self) -> None:
445 """Simulate two musicians adding notes at non-overlapping bars."""
446 ours_ops: list[DomainOp] = [
447 _ins("lead.mid", pos=5, cid="note-A"),
448 _ins("lead.mid", pos=10, cid="note-B"),
449 ]
450 theirs_ops: list[DomainOp] = [
451 _ins("lead.mid", pos=3, cid="note-X"),
452 _ins("lead.mid", pos=8, cid="note-Y"),
453 ]
454 result = merge_op_lists([], ours_ops, theirs_ops)
455 assert result.is_clean is True
456 assert len(result.merged_ops) == 4
457
458 # Expected positions after adjustment (counting formula):
459 # note-A(5) → 5 + count(theirs ≤ 5) = 5 + 1[X(3)] = 6
460 # note-B(10) → 10 + 2[X(3),Y(8)] = 12
461 # note-X(3) → 3 + 0 = 3
462 # note-Y(8) → 8 + 1[A(5)] = 9
463 pos_by_cid = {
464 op["content_id"]: op["position"]
465 for op in result.merged_ops
466 if op["op"] == "insert"
467 }
468 assert pos_by_cid["note-A"] == 6
469 assert pos_by_cid["note-B"] == 12
470 assert pos_by_cid["note-X"] == 3
471 assert pos_by_cid["note-Y"] == 9
472
473 def test_mixed_conflict_and_clean_ops(self) -> None:
474 """A conflict on one file should not contaminate clean ops on others."""
475 conflict_ours = _rep("shared.mid", "old", "v-ours")
476 conflict_theirs = _rep("shared.mid", "old", "v-theirs")
477 clean_ours = _ins("only-ours.mid", pos=0, cid="ours-new-file")
478 clean_theirs = _del("only-theirs.mid", pos=2, cid="their-del")
479
480 result = merge_op_lists(
481 [],
482 [conflict_ours, clean_ours],
483 [conflict_theirs, clean_theirs],
484 )
485 assert len(result.conflict_ops) == 1
486 # Clean ops from both sides should appear in merged.
487 merged_cids = {
488 op.get("content_id", "") or op.get("new_content_id", "")
489 for op in result.merged_ops
490 }
491 assert "ours-new-file" in merged_cids
492 assert "their-del" in merged_cids
493
494 def test_patch_ops_at_different_files_both_included(self) -> None:
495 ours_op = _patch("track-a.mid")
496 theirs_op = _patch("track-b.mid")
497 result = merge_op_lists([], [ours_op], [theirs_op])
498 assert result.is_clean is True
499 assert len(result.merged_ops) == 2
500
501 def test_patch_ops_at_same_file_with_non_conflicting_children(self) -> None:
502 child_a = _ins("note:0", pos=1, cid="note-1")
503 child_b = _ins("note:0", pos=4, cid="note-2")
504 ours_op = _patch("f.mid", child_ops=[child_a])
505 theirs_op = _patch("f.mid", child_ops=[child_b])
506 result = merge_op_lists([], [ours_op], [theirs_op])
507 # PatchOps at same address with commuting children should commute.
508 assert result.is_clean is True
509
510 def test_move_and_delete_conflict_detected(self) -> None:
511 move_op = _mov("f.mid", from_pos=5, to_pos=0)
512 del_op = _del("f.mid", pos=5)
513 result = merge_op_lists([], [move_op], [del_op])
514 assert not result.is_clean
515
516 def test_merge_op_lists_is_deterministic(self) -> None:
517 """Same inputs → same output on every call."""
518 ours = [_ins("f.mid", pos=2, cid="a"), _del("g.mid", pos=0, cid="b")]
519 theirs = [_ins("f.mid", pos=7, cid="c"), _rep("h.mid", "x", "y")]
520 r1 = merge_op_lists([], ours, theirs)
521 r2 = merge_op_lists([], ours, theirs)
522 assert [_op_key(o) for o in r1.merged_ops] == [_op_key(o) for o in r2.merged_ops]
523 assert r1.conflict_ops == r2.conflict_ops
524
525
526 # ===========================================================================
527 # Part 5 — merge_structured: StructuredDelta entry point
528 # ===========================================================================
529
530
531 class TestMergeStructured:
532 def test_empty_deltas_produce_clean_result(self) -> None:
533 base = _delta([])
534 ours = _delta([])
535 theirs = _delta([])
536 result = merge_structured(base, ours, theirs)
537 assert result.is_clean is True
538 assert result.merged_ops == []
539
540 def test_non_conflicting_deltas_auto_merge(self) -> None:
541 op_a = _ins("a.mid", pos=1, cid="A")
542 op_b = _ins("b.mid", pos=2, cid="B")
543 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
544 assert result.is_clean is True
545 assert len(result.merged_ops) == 2
546
547 def test_conflicting_deltas_reported(self) -> None:
548 op_a = _rep("shared.mid", "old", "v-a")
549 op_b = _rep("shared.mid", "old", "v-b")
550 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
551 assert not result.is_clean
552 assert len(result.conflict_ops) == 1
553
554 def test_base_ops_respected_by_both_sides(self) -> None:
555 shared = _ins("f.mid", pos=0, cid="shared")
556 result = merge_structured(
557 _delta([shared]),
558 _delta([shared, _ins("f.mid", pos=5, cid="extra-ours")]),
559 _delta([shared]),
560 )
561 assert result.is_clean is True
562 # The 'shared' op is kept; 'extra-ours' is new and passes through.
563 assert len(result.merged_ops) >= 1
564
565
566 # ===========================================================================
567 # Part 6 — MergeOpsResult
568 # ===========================================================================
569
570
571 class TestMergeOpsResult:
572 def test_is_clean_when_no_conflicts(self) -> None:
573 r = MergeOpsResult(merged_ops=[], conflict_ops=[])
574 assert r.is_clean is True
575
576 def test_is_not_clean_when_conflicts_present(self) -> None:
577 a = _ins("f.mid", pos=1)
578 b = _ins("f.mid", pos=1)
579 r = MergeOpsResult(merged_ops=[], conflict_ops=[(a, b)])
580 assert r.is_clean is False
581
582 def test_default_factory_empty_lists(self) -> None:
583 r = MergeOpsResult()
584 assert r.merged_ops == []
585 assert r.conflict_ops == []
586
587
588 # ===========================================================================
589 # Part 7 — _op_key determinism and uniqueness
590 # ===========================================================================
591
592
593 class TestOpKey:
594 def test_insert_key_includes_all_fields(self) -> None:
595 op = _ins("f.mid", pos=3, cid="abc")
596 key = _op_key(op)
597 assert "insert" in key
598 assert "f.mid" in key
599 assert "3" in key
600 assert "abc" in key
601
602 def test_same_op_produces_same_key(self) -> None:
603 op = _del("f.mid", pos=2, cid="xyz")
604 assert _op_key(op) == _op_key(op)
605
606 def test_different_positions_produce_different_keys(self) -> None:
607 a = _ins("f.mid", pos=1, cid="c")
608 b = _ins("f.mid", pos=2, cid="c")
609 assert _op_key(a) != _op_key(b)
610
611 def test_move_key_includes_from_and_to(self) -> None:
612 op = _mov("f.mid", from_pos=3, to_pos=7)
613 key = _op_key(op)
614 assert "3" in key
615 assert "7" in key
616
617 def test_replace_key_includes_old_and_new(self) -> None:
618 op = _rep("f.mid", "old-id", "new-id")
619 key = _op_key(op)
620 assert "old-id" in key
621 assert "new-id" in key
622
623 def test_patch_key_includes_address_and_domain(self) -> None:
624 op = _patch("f.mid")
625 key = _op_key(op)
626 assert "patch" in key
627 assert "f.mid" in key