cgcardona / muse public
test_stress_diff_algorithms.py python
406 lines 15.2 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Stress tests for the diff algorithm library.
2
3 Covers:
4 - LCS / Myers: empty sequences, identical, reversed, interleaved, long sequences.
5 Round-trip property: applying the edit script to the base must reproduce target.
6 - Tree edit: single node, add subtree, remove subtree, deep nesting.
7 - Numerical diff: all three modes (sparse / block / full), epsilon tolerance.
8 - Set diff: add / remove / replace elements.
9 - snapshot_diff: file-level diffing via ScaffoldPlugin.
10 - detect_moves: move detection from paired insert+delete.
11 """
12
13 import hashlib
14 import random
15 from typing import Literal
16
17 import pytest
18
19 from muse.core.diff_algorithms.lcs import myers_ses, detect_moves, EditStep
20 from muse.core.diff_algorithms.tree_edit import TreeNode, diff as tree_diff
21 from muse.core.diff_algorithms.numerical import diff as num_diff
22 from muse.core.diff_algorithms.set_ops import diff as set_diff
23 from muse.core.schema import (
24 SequenceSchema,
25 TensorSchema,
26 SetSchema,
27 TreeSchema,
28 )
29 from muse.domain import InsertOp, DeleteOp, DomainOp, SnapshotManifest
30 from muse.plugins.scaffold.plugin import ScaffoldPlugin
31
32
33 # ---------------------------------------------------------------------------
34 # Helpers
35 # ---------------------------------------------------------------------------
36
37
38 def _cid(label: str) -> str:
39 return hashlib.sha256(label.encode()).hexdigest()[:16]
40
41
42 def _apply_ses(base: list[str], steps: list[EditStep]) -> list[str]:
43 """Apply an edit-script to base to reconstruct target."""
44 result: list[str] = []
45 for step in steps:
46 if step.kind == "keep":
47 result.append(base[step.base_index])
48 elif step.kind == "insert":
49 result.append(step.item)
50 # "delete" steps are skipped.
51 return result
52
53
54 def _seq_schema(addr: str = "test") -> SequenceSchema:
55 return SequenceSchema(
56 field_name=addr,
57 address=addr,
58 diff_algorithm="lcs",
59 element_type="content_id",
60 )
61
62
63 def _tensor_schema(
64 mode: Literal["sparse", "block", "full"] = "sparse", epsilon: float = 1e-9
65 ) -> TensorSchema:
66 return TensorSchema(
67 field_name="tensor",
68 address="tensor",
69 shape=[],
70 dtype="float64",
71 diff_mode=mode,
72 epsilon=epsilon,
73 )
74
75
76 def _set_schema() -> SetSchema:
77 return SetSchema(field_name="elems", address="elems", element_type="str")
78
79
80 def _tree_schema() -> TreeSchema:
81 return TreeSchema(kind="tree", node_type="generic", diff_algorithm="zhang_shasha")
82
83
84 # ===========================================================================
85 # LCS / Myers
86 # ===========================================================================
87
88
89 class TestMyersSES:
90 def test_empty_base_to_nonempty(self) -> None:
91 target = [_cid(f"x{i}") for i in range(5)]
92 steps = myers_ses([], target)
93 assert all(s.kind == "insert" for s in steps)
94
95 def test_nonempty_to_empty(self) -> None:
96 base = [_cid(f"x{i}") for i in range(5)]
97 steps = myers_ses(base, [])
98 assert all(s.kind == "delete" for s in steps)
99
100 def test_identical_sequences_all_keep(self) -> None:
101 seq = [_cid(f"x{i}") for i in range(20)]
102 steps = myers_ses(seq, seq)
103 assert all(s.kind == "keep" for s in steps)
104
105 def test_single_insert_at_beginning(self) -> None:
106 base = [_cid("b"), _cid("c")]
107 target = [_cid("a"), _cid("b"), _cid("c")]
108 steps = myers_ses(base, target)
109 result = _apply_ses(base, steps)
110 assert result == target
111
112 def test_single_delete_from_middle(self) -> None:
113 base = [_cid("a"), _cid("b"), _cid("c")]
114 target = [_cid("a"), _cid("c")]
115 steps = myers_ses(base, target)
116 result = _apply_ses(base, steps)
117 assert result == target
118
119 def test_reversed_sequence(self) -> None:
120 base = [_cid(f"x{i}") for i in range(10)]
121 target = list(reversed(base))
122 steps = myers_ses(base, target)
123 result = _apply_ses(base, steps)
124 assert result == target
125
126 def test_interleaved_sequences(self) -> None:
127 base = [_cid("a"), _cid("b"), _cid("c"), _cid("d")]
128 target = [_cid("a"), _cid("X"), _cid("b"), _cid("Y"), _cid("c"), _cid("Z"), _cid("d")]
129 steps = myers_ses(base, target)
130 result = _apply_ses(base, steps)
131 assert result == target
132
133 def test_round_trip_property_random_sequences(self) -> None:
134 """Applying the SES to base must always reproduce target."""
135 rng = random.Random(42)
136 alphabet = [_cid(f"elem-{i}") for i in range(20)]
137 for _ in range(50):
138 base = rng.choices(alphabet, k=rng.randint(0, 15))
139 target = rng.choices(alphabet, k=rng.randint(0, 15))
140 steps = myers_ses(base, target)
141 assert _apply_ses(base, steps) == target
142
143 def test_long_sequence_1000_items(self) -> None:
144 base = [_cid(f"item-{i}") for i in range(1000)]
145 target = [_cid(f"item-{i}") for i in range(500, 1000)] + [_cid(f"new-{i}") for i in range(500)]
146 steps = myers_ses(base, target)
147 result = _apply_ses(base, steps)
148 assert result == target
149
150 def test_empty_both_sides(self) -> None:
151 assert myers_ses([], []) == []
152
153
154 # ===========================================================================
155 # detect_moves
156 # ===========================================================================
157
158
159 class TestDetectMoves:
160 def _make_insert(self, cid: str, pos: int | None = None) -> InsertOp:
161 return InsertOp(op="insert", address="test", position=pos, content_id=cid, content_summary=cid)
162
163 def _make_delete(self, cid: str, pos: int | None = None) -> DeleteOp:
164 return DeleteOp(op="delete", address="test", position=pos, content_id=cid, content_summary=cid)
165
166 def test_matching_insert_delete_becomes_move(self) -> None:
167 cid = _cid("moved-item")
168 inserts = [self._make_insert(cid, pos=5)]
169 deletes = [self._make_delete(cid, pos=2)]
170 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
171 assert len(moves) == 1
172 assert moves[0]["op"] == "move"
173
174 def test_no_match_when_different_content_ids(self) -> None:
175 inserts = [self._make_insert(_cid("a"), pos=5)]
176 deletes = [self._make_delete(_cid("b"), pos=2)]
177 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
178 assert len(moves) == 0
179
180 def test_no_moves_on_empty_inputs(self) -> None:
181 moves, ins, dels = detect_moves([], [])
182 assert moves == []
183 assert ins == []
184 assert dels == []
185
186 def test_same_position_not_a_move(self) -> None:
187 """Insert and delete at same position: not a move."""
188 cid = _cid("same-pos")
189 inserts = [self._make_insert(cid, pos=3)]
190 deletes = [self._make_delete(cid, pos=3)]
191 moves, remaining_ins, remaining_del = detect_moves(inserts, deletes)
192 # Same position = not a move by definition.
193 assert len(moves) == 0
194
195
196 # ===========================================================================
197 # Tree edit
198 # ===========================================================================
199
200
201 class TestTreeEditDiff:
202 def _node(self, label: str, cid: str, children: tuple[TreeNode, ...] = ()) -> TreeNode:
203 return TreeNode(id=label, label=label, content_id=cid, children=children)
204
205 def test_identical_trees_no_ops(self) -> None:
206 tree = self._node("root", _cid("root"), (
207 self._node("child-a", _cid("a")),
208 self._node("child-b", _cid("b")),
209 ))
210 schema = _tree_schema()
211 delta = tree_diff(schema, tree, tree, domain="test")
212 assert delta["ops"] == []
213
214 def test_replace_root_content(self) -> None:
215 base = self._node("root", _cid("v1"))
216 target = self._node("root", _cid("v2"))
217 schema = _tree_schema()
218 delta = tree_diff(schema, base, target, domain="test")
219 assert any(op["op"] == "replace" for op in delta["ops"])
220
221 def test_add_child_node(self) -> None:
222 base = self._node("root", _cid("root"), (self._node("a", _cid("a")),))
223 target = self._node("root", _cid("root"), (
224 self._node("a", _cid("a")),
225 self._node("b", _cid("b")),
226 ))
227 schema = _tree_schema()
228 delta = tree_diff(schema, base, target, domain="test")
229 assert any(op["op"] == "insert" for op in delta["ops"])
230
231 def test_remove_child_node(self) -> None:
232 base = self._node("root", _cid("root"), (
233 self._node("a", _cid("a")),
234 self._node("b", _cid("b")),
235 ))
236 target = self._node("root", _cid("root"), (self._node("a", _cid("a")),))
237 schema = _tree_schema()
238 delta = tree_diff(schema, base, target, domain="test")
239 assert any(op["op"] == "delete" for op in delta["ops"])
240
241 def test_empty_tree_to_populated(self) -> None:
242 base = self._node("root", _cid("root"))
243 target = self._node("root", _cid("root"), tuple(
244 self._node(f"child-{i}", _cid(f"c{i}")) for i in range(5)
245 ))
246 schema = _tree_schema()
247 delta = tree_diff(schema, base, target, domain="test")
248 assert any(op["op"] == "insert" for op in delta["ops"])
249
250
251 # ===========================================================================
252 # Numerical diff
253 # ===========================================================================
254
255
256 class TestNumericalDiff:
257 def test_identical_arrays_no_ops(self) -> None:
258 arr = [float(i) for i in range(100)]
259 schema = _tensor_schema("sparse")
260 delta = num_diff(schema, arr, arr, domain="test")
261 assert delta["ops"] == []
262
263 def test_sparse_mode_one_change(self) -> None:
264 base = [0.0] * 10
265 target = [0.0] * 10
266 target[5] = 1.0
267 schema = _tensor_schema("sparse")
268 delta = num_diff(schema, base, target, domain="test")
269 assert len(delta["ops"]) == 1
270 assert delta["ops"][0]["op"] == "replace"
271
272 def test_block_mode_adjacent_changes_grouped(self) -> None:
273 base = [0.0] * 10
274 target = [0.0] * 10
275 for i in range(3, 7):
276 target[i] = float(i)
277 schema = _tensor_schema("block")
278 delta = num_diff(schema, base, target, domain="test")
279 # Block mode should group adjacent changes.
280 assert len(delta["ops"]) <= 4
281
282 def test_full_mode_single_op_for_any_change(self) -> None:
283 base = [0.0] * 100
284 target = list(base)
285 target[50] = 1.0
286 target[99] = 2.0
287 schema = _tensor_schema("full")
288 delta = num_diff(schema, base, target, domain="test")
289 assert len(delta["ops"]) == 1
290
291 def test_epsilon_tolerance_no_spurious_diffs(self) -> None:
292 base = [1.0, 2.0, 3.0]
293 target = [1.0 + 1e-12, 2.0 - 1e-12, 3.0 + 1e-12]
294 schema = _tensor_schema("sparse", epsilon=1e-9)
295 delta = num_diff(schema, base, target, domain="test")
296 assert delta["ops"] == []
297
298 def test_epsilon_threshold_triggers_diff(self) -> None:
299 base = [1.0]
300 target = [1.0 + 1e-5]
301 schema = _tensor_schema("sparse", epsilon=1e-9)
302 delta = num_diff(schema, base, target, domain="test")
303 assert len(delta["ops"]) == 1
304
305 def test_empty_arrays_no_ops(self) -> None:
306 schema = _tensor_schema("sparse")
307 delta = num_diff(schema, [], [], domain="test")
308 assert delta["ops"] == []
309
310 def test_all_values_changed_sparse(self) -> None:
311 base = [0.0] * 50
312 target = [1.0] * 50
313 schema = _tensor_schema("sparse")
314 delta = num_diff(schema, base, target, domain="test")
315 assert len(delta["ops"]) == 50
316
317
318 # ===========================================================================
319 # Set diff
320 # ===========================================================================
321
322
323 class TestSetDiff:
324 def test_identical_sets_no_ops(self) -> None:
325 s = frozenset(f"elem-{i}" for i in range(20))
326 schema = _set_schema()
327 delta = set_diff(schema, s, s, domain="test")
328 assert delta["ops"] == []
329
330 def test_add_elements(self) -> None:
331 base: frozenset[str] = frozenset()
332 target = frozenset(f"new-{i}" for i in range(10))
333 schema = _set_schema()
334 delta = set_diff(schema, base, target, domain="test")
335 assert all(op["op"] == "insert" for op in delta["ops"])
336 assert len(delta["ops"]) == 10
337
338 def test_remove_elements(self) -> None:
339 base = frozenset(f"elem-{i}" for i in range(10))
340 target: frozenset[str] = frozenset()
341 schema = _set_schema()
342 delta = set_diff(schema, base, target, domain="test")
343 assert all(op["op"] == "delete" for op in delta["ops"])
344 assert len(delta["ops"]) == 10
345
346 def test_mixed_add_remove(self) -> None:
347 base = frozenset(f"base-{i}" for i in range(5))
348 target = frozenset(f"base-{i}" for i in range(3)) | frozenset(f"new-{i}" for i in range(3))
349 schema = _set_schema()
350 delta = set_diff(schema, base, target, domain="test")
351 ops_by_kind = {"insert": 0, "delete": 0}
352 for op in delta["ops"]:
353 ops_by_kind[op["op"]] += 1
354 assert ops_by_kind["insert"] == 3
355 assert ops_by_kind["delete"] == 2
356
357 def test_empty_base_to_empty_target(self) -> None:
358 schema = _set_schema()
359 delta = set_diff(schema, frozenset(), frozenset(), domain="test")
360 assert delta["ops"] == []
361
362
363 # ===========================================================================
364 # snapshot_diff (via ScaffoldPlugin which uses it internally)
365 # ===========================================================================
366
367
368 class TestSnapshotDiff:
369 """Test snapshot_diff indirectly via ScaffoldPlugin.diff() which delegates to it."""
370
371 def _diff(self, base: dict[str, str], target: dict[str, str]) -> list[DomainOp]:
372 plugin = ScaffoldPlugin()
373 base_snap = SnapshotManifest(files=base, domain="scaffold")
374 target_snap = SnapshotManifest(files=target, domain="scaffold")
375 delta = plugin.diff(base_snap, target_snap)
376 return list(delta["ops"])
377
378 def test_identical_snapshots_empty_ops(self) -> None:
379 manifest = {f"f{i}.py": _cid(f"v{i}") for i in range(10)}
380 ops = self._diff(manifest, manifest)
381 assert ops == []
382
383 def test_added_files_produce_insert_ops(self) -> None:
384 ops = self._diff({}, {"new.py": _cid("new")})
385 assert any(op["op"] == "insert" and op["address"] == "new.py" for op in ops)
386
387 def test_removed_files_produce_delete_ops(self) -> None:
388 ops = self._diff({"old.py": _cid("old")}, {})
389 assert any(op["op"] == "delete" and op["address"] == "old.py" for op in ops)
390
391 def test_modified_files_produce_replace_ops(self) -> None:
392 ops = self._diff({"f.py": _cid("v1")}, {"f.py": _cid("v2")})
393 assert any(op["op"] == "replace" and op["address"] == "f.py" for op in ops)
394
395 def test_50_file_snapshot_diff_complete(self) -> None:
396 base = {f"f{i:03d}.py": _cid(f"v1-{i}") for i in range(50)}
397 target = {f"f{i:03d}.py": _cid(f"v2-{i}") for i in range(50)}
398 ops = self._diff(base, target)
399 assert len(ops) == 50
400 assert all(op["op"] == "replace" for op in ops)
401
402 def test_ops_sorted_by_address(self) -> None:
403 target = {f"z{i:02d}.py": _cid(f"v{i}") for i in range(10)}
404 ops = self._diff({}, target)
405 addresses = [op["address"] for op in ops]
406 assert addresses == sorted(addresses)