cgcardona / muse public
test_diff_algorithms.py python
601 lines 23.6 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for the diff algorithm library.
2
3 Covers all four algorithm modules (lcs, tree_edit, numerical, set_ops) and
4 the schema-driven dispatch in ``muse.core.diff_algorithms``.
5
6 Each algorithm is tested at three levels:
7 1. **Unit** — the core function in isolation.
8 2. **Output shape** — the returned ``StructuredDelta`` is well-formed.
9 3. **Dispatch** — ``diff_by_schema`` routes correctly for each schema kind.
10 """
11
12 import hashlib
13
14 import pytest
15 from typing import Literal
16
17 from muse.core.diff_algorithms import (
18 DiffInput,
19 MapInput,
20 SequenceInput,
21 SetInput,
22 TensorInput,
23 TreeInput,
24 TreeNode,
25 diff_by_schema,
26 snapshot_diff,
27 )
28 from muse.core.diff_algorithms import lcs as lcs_mod
29 from muse.core.diff_algorithms import numerical as numerical_mod
30 from muse.core.diff_algorithms import set_ops as set_ops_mod
31 from muse.core.diff_algorithms import tree_edit as tree_edit_mod
32 from muse.core.schema import (
33 DomainSchema,
34 DimensionSpec,
35 MapSchema,
36 SequenceSchema,
37 SetSchema,
38 TensorSchema,
39 TreeSchema,
40 )
41 from muse.domain import SnapshotManifest, StructuredDelta
42
43
44 # ---------------------------------------------------------------------------
45 # Helpers
46 # ---------------------------------------------------------------------------
47
48
49 def _cid(s: str) -> str:
50 """Return a deterministic SHA-256 hex for a short string."""
51 return hashlib.sha256(s.encode()).hexdigest()
52
53
54 def _seq_schema(element_type: str = "item") -> SequenceSchema:
55 return SequenceSchema(
56 kind="sequence",
57 element_type=element_type,
58 identity="by_position",
59 diff_algorithm="lcs",
60 alphabet=None,
61 )
62
63
64 def _set_schema(element_type: str = "file") -> SetSchema:
65 return SetSchema(kind="set", element_type=element_type, identity="by_content")
66
67
68 DiffMode = Literal["sparse", "block", "full"]
69
70
71 def _tensor_schema(
72 mode: DiffMode = "sparse", epsilon: float = 0.0
73 ) -> TensorSchema:
74 return TensorSchema(
75 kind="tensor",
76 dtype="float32",
77 rank=1,
78 epsilon=epsilon,
79 diff_mode=mode,
80 )
81
82
83 def _tree_schema() -> TreeSchema:
84 return TreeSchema(kind="tree", node_type="node", diff_algorithm="zhang_shasha")
85
86
87 def _map_schema() -> MapSchema:
88 return MapSchema(
89 kind="map",
90 key_type="key",
91 value_schema=_seq_schema(),
92 identity="by_key",
93 )
94
95
96 def _leaf(label: str) -> TreeNode:
97 return TreeNode(id=label, label=label, content_id=_cid(label), children=())
98
99
100 def _node(label: str, *children: TreeNode) -> TreeNode:
101 return TreeNode(
102 id=label, label=label, content_id=_cid(label), children=tuple(children)
103 )
104
105
106 def _is_valid_delta(d: StructuredDelta) -> bool:
107 return isinstance(d["ops"], list) and isinstance(d["summary"], str)
108
109
110 # ===========================================================================
111 # LCS / Myers tests
112 # ===========================================================================
113
114
115 class TestLCSMyersSES:
116 def test_empty_to_empty_returns_no_steps(self) -> None:
117 steps = lcs_mod.myers_ses([], [])
118 assert steps == []
119
120 def test_empty_to_sequence_all_inserts(self) -> None:
121 steps = lcs_mod.myers_ses([], ["a", "b", "c"])
122 kinds = [s.kind for s in steps]
123 assert kinds == ["insert", "insert", "insert"]
124
125 def test_sequence_to_empty_all_deletes(self) -> None:
126 steps = lcs_mod.myers_ses(["a", "b", "c"], [])
127 kinds = [s.kind for s in steps]
128 assert kinds == ["delete", "delete", "delete"]
129
130 def test_identical_sequences_all_keeps(self) -> None:
131 ids = ["a", "b", "c"]
132 steps = lcs_mod.myers_ses(ids, ids)
133 assert all(s.kind == "keep" for s in steps)
134 assert len(steps) == 3
135
136 def test_single_insert_in_middle(self) -> None:
137 base = ["a", "c"]
138 target = ["a", "b", "c"]
139 steps = lcs_mod.myers_ses(base, target)
140 inserts = [s for s in steps if s.kind == "insert"]
141 assert len(inserts) == 1
142 assert inserts[0].item == "b"
143
144 def test_single_delete_in_middle(self) -> None:
145 base = ["a", "b", "c"]
146 target = ["a", "c"]
147 steps = lcs_mod.myers_ses(base, target)
148 deletes = [s for s in steps if s.kind == "delete"]
149 assert len(deletes) == 1
150 assert deletes[0].item == "b"
151
152 def test_lcs_is_minimal(self) -> None:
153 base = ["a", "b", "c", "d"]
154 target = ["a", "x", "c", "d"]
155 steps = lcs_mod.myers_ses(base, target)
156 keeps = [s for s in steps if s.kind == "keep"]
157 inserts = [s for s in steps if s.kind == "insert"]
158 deletes = [s for s in steps if s.kind == "delete"]
159 assert len(keeps) == 3 # a, c, d are kept
160 assert len(inserts) == 1
161 assert len(deletes) == 1
162
163 def test_step_indices_are_consistent(self) -> None:
164 base = ["x", "y", "z"]
165 target = ["y", "z", "w"]
166 steps = lcs_mod.myers_ses(base, target)
167 for s in steps:
168 if s.kind == "delete":
169 assert s.item == base[s.base_index]
170 elif s.kind == "insert":
171 assert s.item == target[s.target_index]
172
173
174 class TestLCSDetectMoves:
175 def test_paired_delete_insert_becomes_move(self) -> None:
176 from muse.domain import DeleteOp, InsertOp
177
178 cid = _cid("note")
179 ins_op = InsertOp(op="insert", address="", position=3, content_id=cid, content_summary="")
180 del_op = DeleteOp(op="delete", address="", position=0, content_id=cid, content_summary="")
181 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
182 assert len(moves) == 1
183 assert moves[0]["op"] == "move"
184 assert moves[0]["from_position"] == 0
185 assert moves[0]["to_position"] == 3
186 assert len(rem_ins) == 0
187 assert len(rem_del) == 0
188
189 def test_same_position_not_a_move(self) -> None:
190 from muse.domain import DeleteOp, InsertOp
191
192 cid = _cid("item")
193 ins_op = InsertOp(op="insert", address="", position=1, content_id=cid, content_summary="")
194 del_op = DeleteOp(op="delete", address="", position=1, content_id=cid, content_summary="")
195 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
196 assert len(moves) == 0
197 assert len(rem_ins) == 1
198 assert len(rem_del) == 1
199
200 def test_no_paired_content_no_moves(self) -> None:
201 from muse.domain import DeleteOp, InsertOp
202
203 ins_op = InsertOp(op="insert", address="", position=0, content_id=_cid("a"), content_summary="")
204 del_op = DeleteOp(op="delete", address="", position=0, content_id=_cid("b"), content_summary="")
205 moves, rem_ins, rem_del = lcs_mod.detect_moves([ins_op], [del_op])
206 assert len(moves) == 0
207 assert len(rem_ins) == 1
208 assert len(rem_del) == 1
209
210
211 class TestLCSDiff:
212 def test_empty_to_sequence_is_all_inserts(self) -> None:
213 delta = lcs_mod.diff(_seq_schema(), [], ["a", "b"], domain="test")
214 ops = [op for op in delta["ops"] if op["op"] == "insert"]
215 assert len(ops) == 2
216
217 def test_sequence_to_empty_is_all_deletes(self) -> None:
218 delta = lcs_mod.diff(_seq_schema(), ["a", "b"], [], domain="test")
219 ops = [op for op in delta["ops"] if op["op"] == "delete"]
220 assert len(ops) == 2
221
222 def test_identical_sequences_returns_no_ops(self) -> None:
223 delta = lcs_mod.diff(_seq_schema(), ["a", "b", "c"], ["a", "b", "c"], domain="test")
224 assert delta["ops"] == []
225
226 def test_produces_valid_structured_delta(self) -> None:
227 delta = lcs_mod.diff(_seq_schema("note"), ["x"], ["x", "y"], domain="midi")
228 assert _is_valid_delta(delta)
229 assert delta["domain"] == "midi"
230
231 def test_move_detected_from_delete_plus_insert(self) -> None:
232 a, b, c = _cid("a"), _cid("b"), _cid("c")
233 delta = lcs_mod.diff(_seq_schema(), [a, b, c], [b, c, a], domain="test")
234 ops_by_kind = {op["op"] for op in delta["ops"]}
235 assert "move" in ops_by_kind
236
237 def test_summary_is_human_readable(self) -> None:
238 delta = lcs_mod.diff(_seq_schema("note"), ["a"], ["a", "b"], domain="test")
239 assert "note" in delta["summary"]
240 assert "added" in delta["summary"]
241
242
243 # ===========================================================================
244 # Tree edit tests
245 # ===========================================================================
246
247
248 class TestTreeEditDiff:
249 def test_identical_trees_returns_no_ops(self) -> None:
250 root = _node("root", _leaf("A"), _leaf("B"))
251 delta = tree_edit_mod.diff(_tree_schema(), root, root, domain="test")
252 assert delta["ops"] == []
253
254 def test_leaf_relabel_is_replace(self) -> None:
255 base = _leaf("A")
256 old_cid = _cid("A")
257 new_node = TreeNode(id="A", label="A", content_id=_cid("A_new"), children=())
258 target = TreeNode(id="root", label="root", content_id=_cid("root"),
259 children=(new_node,))
260 base_root = TreeNode(id="root", label="root", content_id=_cid("root"),
261 children=(base,))
262 delta = tree_edit_mod.diff(_tree_schema(), base_root, target, domain="test")
263 replace_ops = [op for op in delta["ops"] if op["op"] == "replace"]
264 assert len(replace_ops) == 1
265
266 def test_node_insert(self) -> None:
267 base = _node("root", _leaf("A"))
268 target = _node("root", _leaf("A"), _leaf("B"))
269 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
270 insert_ops = [op for op in delta["ops"] if op["op"] == "insert"]
271 assert len(insert_ops) >= 1
272
273 def test_node_delete(self) -> None:
274 base = _node("root", _leaf("A"), _leaf("B"))
275 target = _node("root", _leaf("A"))
276 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
277 delete_ops = [op for op in delta["ops"] if op["op"] == "delete"]
278 assert len(delete_ops) >= 1
279
280 def test_subtree_move(self) -> None:
281 leaf_a = _leaf("A")
282 leaf_b = _leaf("B")
283 base = _node("root", leaf_a, leaf_b)
284 # Move: leaf_b before leaf_a
285 target = _node("root", leaf_b, leaf_a)
286 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
287 # Should produce a move or a pair of delete/insert
288 op_kinds = {op["op"] for op in delta["ops"]}
289 assert op_kinds & {"move", "insert", "delete"}
290
291 def test_produces_valid_structured_delta(self) -> None:
292 base = _node("root", _leaf("X"))
293 target = _node("root", _leaf("Y"))
294 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="midi")
295 assert _is_valid_delta(delta)
296 assert delta["domain"] == "midi"
297
298 def test_summary_is_human_readable(self) -> None:
299 base = _node("root", _leaf("A"))
300 target = _node("root", _leaf("A"), _leaf("B"))
301 delta = tree_edit_mod.diff(_tree_schema(), base, target, domain="test")
302 assert isinstance(delta["summary"], str)
303 assert len(delta["summary"]) > 0
304
305
306 # ===========================================================================
307 # Numerical diff tests
308 # ===========================================================================
309
310
311 class TestNumericalDiff:
312 def test_within_epsilon_returns_no_ops(self) -> None:
313 schema = _tensor_schema(epsilon=1.0)
314 delta = numerical_mod.diff(schema, [1.0, 2.0, 3.0], [1.4, 2.0, 3.0], domain="test")
315 assert delta["ops"] == []
316
317 def test_outside_epsilon_returns_replace(self) -> None:
318 schema = _tensor_schema(epsilon=0.1)
319 delta = numerical_mod.diff(schema, [1.0, 2.0, 3.0], [1.0, 5.0, 3.0], domain="test")
320 assert len(delta["ops"]) == 1
321 assert delta["ops"][0]["op"] == "replace"
322
323 def test_identical_arrays_returns_no_ops(self) -> None:
324 schema = _tensor_schema()
325 delta = numerical_mod.diff(schema, [1.0, 2.0], [1.0, 2.0], domain="test")
326 assert delta["ops"] == []
327
328 def test_sparse_mode_one_op_per_element(self) -> None:
329 schema = _tensor_schema(mode="sparse", epsilon=0.0)
330 base = [1.0, 2.0, 3.0]
331 target = [9.0, 2.0, 9.0]
332 delta = numerical_mod.diff(schema, base, target, domain="test")
333 assert len(delta["ops"]) == 2 # positions 0 and 2
334 for op in delta["ops"]:
335 assert op["op"] == "replace"
336
337 def test_block_mode_groups_adjacent(self) -> None:
338 schema = _tensor_schema(mode="block", epsilon=0.0)
339 base = [1.0, 2.0, 3.0, 4.0, 5.0]
340 target = [9.0, 9.0, 3.0, 9.0, 9.0]
341 delta = numerical_mod.diff(schema, base, target, domain="test")
342 # Changes at 0,1 and 3,4 → two blocks
343 assert len(delta["ops"]) == 2
344
345 def test_full_mode_single_op(self) -> None:
346 schema = _tensor_schema(mode="full", epsilon=0.0)
347 base = [1.0, 2.0, 3.0]
348 target = [1.0, 99.0, 3.0]
349 delta = numerical_mod.diff(schema, base, target, domain="test")
350 assert len(delta["ops"]) == 1
351 assert delta["ops"][0]["op"] == "replace"
352
353 def test_length_mismatch_returns_single_replace(self) -> None:
354 schema = _tensor_schema()
355 delta = numerical_mod.diff(schema, [1.0, 2.0], [1.0, 2.0, 3.0], domain="test")
356 assert len(delta["ops"]) == 1
357 assert delta["ops"][0]["op"] == "replace"
358
359 def test_produces_valid_structured_delta(self) -> None:
360 schema = _tensor_schema(epsilon=0.5)
361 delta = numerical_mod.diff(schema, [0.0, 1.0], [0.0, 2.0], domain="midi")
362 assert _is_valid_delta(delta)
363 assert delta["domain"] == "midi"
364
365
366 # ===========================================================================
367 # Set ops tests
368 # ===========================================================================
369
370
371 class TestSetOpsDiff:
372 def test_add_returns_insert(self) -> None:
373 schema = _set_schema()
374 base: frozenset[str] = frozenset()
375 target = frozenset({_cid("file_a")})
376 delta = set_ops_mod.diff(schema, base, target, domain="test")
377 assert len(delta["ops"]) == 1
378 assert delta["ops"][0]["op"] == "insert"
379
380 def test_remove_returns_delete(self) -> None:
381 schema = _set_schema()
382 cid = _cid("file_a")
383 base = frozenset({cid})
384 target: frozenset[str] = frozenset()
385 delta = set_ops_mod.diff(schema, base, target, domain="test")
386 assert len(delta["ops"]) == 1
387 assert delta["ops"][0]["op"] == "delete"
388
389 def test_no_change_returns_empty(self) -> None:
390 schema = _set_schema()
391 cids = frozenset({_cid("a"), _cid("b")})
392 delta = set_ops_mod.diff(schema, cids, cids, domain="test")
393 assert delta["ops"] == []
394
395 def test_all_ops_have_none_position(self) -> None:
396 schema = _set_schema()
397 base: frozenset[str] = frozenset()
398 target = frozenset({_cid("x"), _cid("y")})
399 delta = set_ops_mod.diff(schema, base, target, domain="test")
400 for op in delta["ops"]:
401 assert op["position"] is None
402
403 def test_produces_valid_structured_delta(self) -> None:
404 schema = _set_schema("audio_file")
405 base = frozenset({_cid("drums"), _cid("bass")})
406 target = frozenset({_cid("drums"), _cid("guitar")})
407 delta = set_ops_mod.diff(schema, base, target, domain="midi")
408 assert _is_valid_delta(delta)
409 assert delta["domain"] == "midi"
410 assert "audio_file" in delta["summary"]
411
412
413 # ===========================================================================
414 # Schema dispatch (diff_by_schema) tests
415 # ===========================================================================
416
417
418 class TestDiffBySchema:
419 def test_dispatch_sequence_schema_calls_lcs(self) -> None:
420 schema = _seq_schema("note")
421 base: DiffInput = SequenceInput(kind="sequence", items=["a"])
422 target: DiffInput = SequenceInput(kind="sequence", items=["a", "b"])
423 delta = diff_by_schema(schema, base, target, domain="test")
424 assert delta["domain"] == "test"
425 insert_ops = [op for op in delta["ops"] if op["op"] == "insert"]
426 assert len(insert_ops) == 1
427
428 def test_dispatch_set_schema_calls_set_ops(self) -> None:
429 schema = _set_schema("file")
430 cid_a = _cid("a")
431 base: DiffInput = SetInput(kind="set", items=frozenset({cid_a}))
432 target: DiffInput = SetInput(kind="set", items=frozenset())
433 delta = diff_by_schema(schema, base, target, domain="test")
434 delete_ops = [op for op in delta["ops"] if op["op"] == "delete"]
435 assert len(delete_ops) == 1
436
437 def test_dispatch_tensor_schema_calls_numerical(self) -> None:
438 schema = _tensor_schema(epsilon=0.0)
439 base: DiffInput = TensorInput(kind="tensor", values=[1.0, 2.0])
440 target: DiffInput = TensorInput(kind="tensor", values=[1.0, 9.0])
441 delta = diff_by_schema(schema, base, target, domain="test")
442 replace_ops = [op for op in delta["ops"] if op["op"] == "replace"]
443 assert len(replace_ops) == 1
444
445 def test_dispatch_tree_schema_calls_tree_edit(self) -> None:
446 schema = _tree_schema()
447 base_tree = _node("root", _leaf("A"))
448 target_tree = _node("root", _leaf("A"), _leaf("B"))
449 base: DiffInput = TreeInput(kind="tree", root=base_tree)
450 target: DiffInput = TreeInput(kind="tree", root=target_tree)
451 delta = diff_by_schema(schema, base, target, domain="test")
452 assert _is_valid_delta(delta)
453
454 def test_dispatch_map_schema_recurses(self) -> None:
455 schema = _map_schema()
456 cid_a, cid_b = _cid("va"), _cid("vb")
457 base: DiffInput = MapInput(kind="map", entries={"key1": cid_a})
458 target: DiffInput = MapInput(kind="map", entries={"key1": cid_b, "key2": cid_a})
459 delta = diff_by_schema(schema, base, target, domain="test")
460 assert _is_valid_delta(delta)
461 # key2 added → insert op; key1 changed → replace op
462 op_kinds = [op["op"] for op in delta["ops"]]
463 assert "insert" in op_kinds
464 assert "replace" in op_kinds
465
466 def test_type_error_on_mismatched_schema_and_input(self) -> None:
467 schema = _seq_schema()
468 wrong_input: DiffInput = SetInput(kind="set", items=frozenset())
469 with pytest.raises(TypeError, match="sequence schema requires SequenceInput"):
470 diff_by_schema(schema, wrong_input, wrong_input, domain="test")
471
472 def test_identical_sequence_produces_no_ops(self) -> None:
473 schema = _seq_schema()
474 items = ["a", "b", "c"]
475 base: DiffInput = SequenceInput(kind="sequence", items=items)
476 target: DiffInput = SequenceInput(kind="sequence", items=items)
477 delta = diff_by_schema(schema, base, target, domain="test")
478 assert delta["ops"] == []
479
480 def test_map_add_key_is_insert(self) -> None:
481 schema = _map_schema()
482 base: DiffInput = MapInput(kind="map", entries={})
483 target: DiffInput = MapInput(kind="map", entries={"chr1": _cid("seq")})
484 delta = diff_by_schema(schema, base, target, domain="genomics")
485 assert delta["ops"][0]["op"] == "insert"
486
487 def test_map_remove_key_is_delete(self) -> None:
488 schema = _map_schema()
489 base: DiffInput = MapInput(kind="map", entries={"chr1": _cid("seq")})
490 target: DiffInput = MapInput(kind="map", entries={})
491 delta = diff_by_schema(schema, base, target, domain="genomics")
492 assert delta["ops"][0]["op"] == "delete"
493
494 def test_map_unchanged_returns_no_ops(self) -> None:
495 schema = _map_schema()
496 entries = {"k1": _cid("v1"), "k2": _cid("v2")}
497 base: DiffInput = MapInput(kind="map", entries=entries)
498 target: DiffInput = MapInput(kind="map", entries=entries)
499 delta = diff_by_schema(schema, base, target, domain="test")
500 assert delta["ops"] == []
501
502
503 # ---------------------------------------------------------------------------
504 # snapshot_diff — schema-driven auto-diff for SnapshotManifests
505 # ---------------------------------------------------------------------------
506
507
508 def _minimal_schema(domain: str) -> DomainSchema:
509 """Minimal DomainSchema for snapshot_diff tests."""
510 return DomainSchema(
511 domain=domain,
512 description="Test domain",
513 dimensions=[],
514 top_level=SetSchema(kind="set", element_type="file", identity="by_content"),
515 merge_mode="three_way",
516 schema_version=1,
517 )
518
519
520 class TestSnapshotDiff:
521 """snapshot_diff provides schema-driven file-level diffs for any plugin."""
522
523 def test_added_file_is_insert_op(self) -> None:
524 schema = _minimal_schema("mydomain")
525 base: SnapshotManifest = {"files": {}, "domain": "mydomain"}
526 target: SnapshotManifest = {"files": {"data.txt": _cid("hello")}, "domain": "mydomain"}
527 delta = snapshot_diff(schema, base, target)
528 assert len(delta["ops"]) == 1
529 assert delta["ops"][0]["op"] == "insert"
530 assert delta["ops"][0]["address"] == "data.txt"
531
532 def test_removed_file_is_delete_op(self) -> None:
533 schema = _minimal_schema("mydomain")
534 base: SnapshotManifest = {"files": {"data.txt": _cid("hello")}, "domain": "mydomain"}
535 target: SnapshotManifest = {"files": {}, "domain": "mydomain"}
536 delta = snapshot_diff(schema, base, target)
537 assert len(delta["ops"]) == 1
538 assert delta["ops"][0]["op"] == "delete"
539
540 def test_modified_file_is_replace_op(self) -> None:
541 schema = _minimal_schema("mydomain")
542 base: SnapshotManifest = {"files": {"data.txt": _cid("v1")}, "domain": "mydomain"}
543 target: SnapshotManifest = {"files": {"data.txt": _cid("v2")}, "domain": "mydomain"}
544 delta = snapshot_diff(schema, base, target)
545 assert len(delta["ops"]) == 1
546 assert delta["ops"][0]["op"] == "replace"
547
548 def test_identical_snapshots_have_no_ops(self) -> None:
549 schema = _minimal_schema("mydomain")
550 manifest: SnapshotManifest = {"files": {"a.txt": _cid("a"), "b.txt": _cid("b")}, "domain": "mydomain"}
551 delta = snapshot_diff(schema, manifest, manifest)
552 assert delta["ops"] == []
553 assert delta["summary"] == "no changes"
554
555 def test_domain_tag_taken_from_schema(self) -> None:
556 schema = _minimal_schema("myplugin")
557 base: SnapshotManifest = {"files": {}, "domain": "myplugin"}
558 target: SnapshotManifest = {"files": {"f.txt": _cid("x")}, "domain": "myplugin"}
559 delta = snapshot_diff(schema, base, target)
560 assert delta["domain"] == "myplugin"
561
562 def test_multiple_changes_produce_correct_op_mix(self) -> None:
563 schema = _minimal_schema("mydomain")
564 base: SnapshotManifest = {
565 "files": {
566 "keep.txt": _cid("same"),
567 "modify.txt": _cid("old"),
568 "delete.txt": _cid("gone"),
569 },
570 "domain": "mydomain",
571 }
572 target: SnapshotManifest = {
573 "files": {
574 "keep.txt": _cid("same"),
575 "modify.txt": _cid("new"),
576 "add.txt": _cid("fresh"),
577 },
578 "domain": "mydomain",
579 }
580 delta = snapshot_diff(schema, base, target)
581 ops_by_kind = {op["op"] for op in delta["ops"]}
582 assert "insert" in ops_by_kind # add.txt
583 assert "delete" in ops_by_kind # delete.txt
584 assert "replace" in ops_by_kind # modify.txt
585 assert len(delta["ops"]) == 3
586
587 def test_scaffold_plugin_uses_snapshot_diff(self) -> None:
588 """ScaffoldPlugin.diff() delegates to snapshot_diff — no custom set-algebra needed."""
589 from muse.plugins.scaffold.plugin import ScaffoldPlugin
590
591 plugin = ScaffoldPlugin()
592 base: SnapshotManifest = {"files": {"a.scaffold": _cid("v1")}, "domain": "scaffold"}
593 target: SnapshotManifest = {
594 "files": {"a.scaffold": _cid("v2"), "b.scaffold": _cid("new")},
595 "domain": "scaffold",
596 }
597 delta = plugin.diff(base, target)
598 op_types = {op["op"] for op in delta["ops"]}
599 assert "replace" in op_types
600 assert "insert" in op_types
601 assert delta["domain"] == "scaffold"