cgcardona / muse public
test_property_based.py python
514 lines 20.2 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Property-based tests using :mod:`hypothesis`.
2
3 Three correctness properties are verified exhaustively over randomly generated
4 inputs — something example-based tests cannot do:
5
6 1. **CRDT lattice laws** — commutativity, associativity, and idempotency hold
7 for all six CRDT types over arbitrary inputs.
8
9 2. **LCS round-trip** — ``diff(a, a)`` always produces zero ops; the number of
10 edits reported equals the true edit distance (inserts + deletes, excluding
11 moves) for arbitrary sequence pairs.
12
13 3. **OT diamond property** — for two concurrently applied ``InsertOp``\\s at
14 distinct positions, ``transform`` produces adjusted ops such that applying
15 them in either order yields the same final sequence.
16
17 Why these three?
18 ----------------
19 - CRDT laws guarantee that any number of agents can merge state in any order
20 and reach the same result. Example-based tests can only verify a handful of
21 cases; hypothesis explores thousands.
22
23 - LCS round-trip safety means the diff engine never produces incorrect deltas
24 for self-diffs (a common pathological case).
25
26 - The OT diamond property is the central correctness criterion for Operational
27 Transformation: two concurrent edits, transformed and applied, must converge.
28 A single counterexample would break real-time collaboration.
29 """
30
31 from typing import TYPE_CHECKING
32
33 import pytest
34 from hypothesis import given, settings
35 from hypothesis import strategies as st
36
37 from muse.core.crdts import (
38 AWMap,
39 GCounter,
40 LWWRegister,
41 ORSet,
42 RGA,
43 VectorClock,
44 )
45 from muse.core.diff_algorithms.lcs import diff as lcs_diff, myers_ses
46 from muse.core.op_transform import ops_commute, transform
47 from muse.core.schema import SequenceSchema
48 from muse.domain import InsertOp
49
50 if TYPE_CHECKING:
51 pass
52
53 # ---------------------------------------------------------------------------
54 # Hypothesis strategies
55 # ---------------------------------------------------------------------------
56
57 # Alphabet for generated string values — small alphabet makes overlaps more
58 # likely, exercising merge paths that would be rare with fully random strings.
59 _ALPHA = st.sampled_from(list("abcde"))
60 _SHORT_TEXT = st.text(alphabet="abcdefghij", min_size=0, max_size=8)
61 _AGENT_ID = st.text(alphabet="abcdefghij", min_size=1, max_size=6)
62 _TIMESTAMP = st.floats(min_value=0.0, max_value=1_000_000.0, allow_nan=False, allow_infinity=False)
63
64
65 @st.composite
66 def lww_registers(draw: st.DrawFn) -> LWWRegister:
67 """Strategy: arbitrary LWWRegister."""
68 return LWWRegister(
69 value=draw(_SHORT_TEXT),
70 timestamp=draw(_TIMESTAMP),
71 author=draw(_AGENT_ID),
72 )
73
74
75 @st.composite
76 def vector_clocks(draw: st.DrawFn) -> VectorClock:
77 """Strategy: VectorClock with 0–4 agents, counts 0–20."""
78 n = draw(st.integers(min_value=0, max_value=4))
79 counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)}
80 return VectorClock(counts)
81
82
83 @st.composite
84 def or_sets(draw: st.DrawFn) -> ORSet:
85 """Strategy: ORSet built by arbitrary add/remove sequences."""
86 elems = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6))
87 s = ORSet()
88 added: list[str] = []
89 for e in elems:
90 s, _ = s.add(e)
91 added.append(e)
92 # Remove a random subset of added elements using observed-token semantics.
93 if added:
94 to_remove = draw(st.lists(st.sampled_from(added), max_size=3, unique=True))
95 for e in to_remove:
96 if e in s.elements():
97 s = s.remove(e, s.tokens_for(e))
98 return s
99
100
101 @st.composite
102 def rgas(draw: st.DrawFn) -> RGA:
103 """Strategy: RGA built by sequential inserts (append-only for simplicity)."""
104 values = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6))
105 rga = RGA()
106 ids: list[str] = []
107 for i, v in enumerate(values):
108 eid = f"{i}@agent"
109 rga = rga.insert(ids[-1] if ids else None, v, element_id=eid)
110 ids.append(eid)
111 # Optionally delete some elements.
112 if ids:
113 to_del = draw(st.lists(st.sampled_from(ids), max_size=2, unique=True))
114 for eid in to_del:
115 rga = rga.delete(eid)
116 return rga
117
118
119 @st.composite
120 def aw_maps(draw: st.DrawFn) -> AWMap:
121 """Strategy: AWMap built by arbitrary set operations."""
122 entries = draw(st.dictionaries(_SHORT_TEXT, _SHORT_TEXT, max_size=4))
123 m = AWMap()
124 for k, v in entries.items():
125 m = m.set(k, v)
126 return m
127
128
129 @st.composite
130 def g_counters(draw: st.DrawFn) -> GCounter:
131 """Strategy: GCounter with 0–4 agents, counts 0–20."""
132 n = draw(st.integers(min_value=0, max_value=4))
133 counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)}
134 return GCounter(counts)
135
136
137 @st.composite
138 def content_id_lists(draw: st.DrawFn) -> list[str]:
139 """Strategy: list of content IDs drawn from a small pool.
140
141 A small pool means pairs of generated lists share elements, exercising
142 the LCS "keep" path rather than always producing pure insert+delete scripts.
143 """
144 pool = [f"h{i}" for i in range(8)] # 8 possible hashes
145 return draw(st.lists(st.sampled_from(pool), min_size=0, max_size=8))
146
147
148 # ---------------------------------------------------------------------------
149 # Shared SequenceSchema for LCS tests
150 # ---------------------------------------------------------------------------
151
152 _SEQ_SCHEMA = SequenceSchema(
153 kind="sequence",
154 element_type="content_id",
155 identity="by_content",
156 diff_algorithm="lcs",
157 order="indexed",
158 )
159
160
161 # ---------------------------------------------------------------------------
162 # CRDT lattice law tests
163 # ---------------------------------------------------------------------------
164
165
166 class TestLWWRegisterLatticeHypothesis:
167 """LWWRegister satisfies all three CRDT lattice laws for arbitrary inputs."""
168
169 @given(lww_registers(), lww_registers())
170 def test_commutativity(self, a: LWWRegister, b: LWWRegister) -> None:
171 assert a.join(b).equivalent(b.join(a))
172
173 @given(lww_registers(), lww_registers(), lww_registers())
174 def test_associativity(self, a: LWWRegister, b: LWWRegister, c: LWWRegister) -> None:
175 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
176
177 @given(lww_registers())
178 def test_idempotency(self, a: LWWRegister) -> None:
179 assert a.join(a).equivalent(a)
180
181 @given(lww_registers(), lww_registers())
182 def test_join_winner_is_higher_timestamp(self, a: LWWRegister, b: LWWRegister) -> None:
183 """The winner's value is always the one with the higher comparison key.
184
185 The key is ``(timestamp, author, value)``; including ``value`` as the
186 final tiebreaker is what makes ``join`` commutative when two writes share
187 the same ``(timestamp, author)`` pair.
188 """
189 result = a.join(b)
190 a_wire = a.to_dict()
191 b_wire = b.to_dict()
192 a_key = (a_wire["timestamp"], a_wire["author"], a_wire["value"])
193 b_key = (b_wire["timestamp"], b_wire["author"], b_wire["value"])
194 if a_key >= b_key:
195 assert result.read() == a.read()
196 else:
197 assert result.read() == b.read()
198
199
200 class TestVectorClockLatticeHypothesis:
201 """VectorClock satisfies all three CRDT lattice laws for arbitrary inputs."""
202
203 @given(vector_clocks(), vector_clocks())
204 def test_commutativity(self, a: VectorClock, b: VectorClock) -> None:
205 assert a.merge(b).equivalent(b.merge(a))
206
207 @given(vector_clocks(), vector_clocks(), vector_clocks())
208 def test_associativity(self, a: VectorClock, b: VectorClock, c: VectorClock) -> None:
209 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
210
211 @given(vector_clocks())
212 def test_idempotency(self, a: VectorClock) -> None:
213 assert a.merge(a).equivalent(a)
214
215 @given(vector_clocks(), vector_clocks())
216 def test_merge_is_pointwise_max(self, a: VectorClock, b: VectorClock) -> None:
217 """Every agent's count in the merged clock equals max(a[agent], b[agent])."""
218 merged = a.merge(b)
219 a_dict = a.to_dict()
220 b_dict = b.to_dict()
221 for agent in set(a_dict) | set(b_dict):
222 expected = max(a_dict.get(agent, 0), b_dict.get(agent, 0))
223 assert merged.to_dict().get(agent, 0) == expected
224
225
226 class TestORSetLatticeHypothesis:
227 """ORSet satisfies all three CRDT lattice laws for arbitrary inputs."""
228
229 @given(or_sets(), or_sets())
230 def test_commutativity(self, a: ORSet, b: ORSet) -> None:
231 assert a.join(b).elements() == b.join(a).elements()
232
233 @given(or_sets(), or_sets(), or_sets())
234 def test_associativity(self, a: ORSet, b: ORSet, c: ORSet) -> None:
235 assert a.join(b).join(c).elements() == a.join(b.join(c)).elements()
236
237 @given(or_sets())
238 def test_idempotency(self, a: ORSet) -> None:
239 assert a.join(a).elements() == a.elements()
240
241 @given(or_sets(), or_sets())
242 def test_join_contains_both_element_sets(self, a: ORSet, b: ORSet) -> None:
243 """join(a, b) must contain every element visible in either a or b."""
244 joined = a.join(b)
245 assert a.elements() <= joined.elements()
246 assert b.elements() <= joined.elements()
247
248
249 class TestRGALatticeHypothesis:
250 """RGA satisfies all three CRDT lattice laws for arbitrary inputs."""
251
252 @given(rgas(), rgas())
253 def test_commutativity(self, a: RGA, b: RGA) -> None:
254 assert a.join(b).equivalent(b.join(a))
255
256 @given(rgas(), rgas(), rgas())
257 def test_associativity(self, a: RGA, b: RGA, c: RGA) -> None:
258 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
259
260 @given(rgas())
261 def test_idempotency(self, a: RGA) -> None:
262 assert a.join(a).equivalent(a)
263
264 @given(rgas(), rgas())
265 def test_join_contains_all_element_ids(self, a: RGA, b: RGA) -> None:
266 """Every element ID known to either replica must appear in the join.
267
268 RGA deletions are *monotone*: once an element is tombstoned in either
269 replica it must remain tombstoned in the join. This means we can only
270 assert that element *IDs* are preserved — not that elements remain
271 visible — because a concurrent delete in the other replica may correctly
272 tombstone them.
273
274 What must hold:
275 - Every ID (visible or tombstoned) from either replica appears in the join.
276 - If an element is tombstoned in either, it is tombstoned in the join.
277 """
278 joined = a.join(b)
279 joined_all = {e["id"]: e for e in joined.to_dict()}
280 a_all = {e["id"]: e for e in a.to_dict()}
281 b_all = {e["id"]: e for e in b.to_dict()}
282
283 # All IDs from both replicas must be present in the join.
284 assert set(a_all) <= set(joined_all), f"IDs from a lost in join: {set(a_all) - set(joined_all)}"
285 assert set(b_all) <= set(joined_all), f"IDs from b lost in join: {set(b_all) - set(joined_all)}"
286
287 # Monotone deletion: if either replica tombstones an element, the join must too.
288 for eid, elem in a_all.items():
289 if elem["deleted"] and eid in joined_all:
290 assert joined_all[eid]["deleted"], f"Element {eid} deleted in a but not in join"
291 for eid, elem in b_all.items():
292 if elem["deleted"] and eid in joined_all:
293 assert joined_all[eid]["deleted"], f"Element {eid} deleted in b but not in join"
294
295
296 class TestAWMapLatticeHypothesis:
297 """AWMap satisfies all three CRDT lattice laws for arbitrary inputs."""
298
299 @given(aw_maps(), aw_maps())
300 def test_commutativity(self, a: AWMap, b: AWMap) -> None:
301 assert a.join(b).to_plain_dict() == b.join(a).to_plain_dict()
302
303 @given(aw_maps(), aw_maps(), aw_maps())
304 def test_associativity(self, a: AWMap, b: AWMap, c: AWMap) -> None:
305 assert a.join(b).join(c).to_plain_dict() == a.join(b.join(c)).to_plain_dict()
306
307 @given(aw_maps())
308 def test_idempotency(self, a: AWMap) -> None:
309 assert a.join(a).to_plain_dict() == a.to_plain_dict()
310
311
312 class TestGCounterLatticeHypothesis:
313 """GCounter satisfies all three CRDT lattice laws for arbitrary inputs."""
314
315 @given(g_counters(), g_counters())
316 def test_commutativity(self, a: GCounter, b: GCounter) -> None:
317 assert a.join(b).equivalent(b.join(a))
318
319 @given(g_counters(), g_counters(), g_counters())
320 def test_associativity(self, a: GCounter, b: GCounter, c: GCounter) -> None:
321 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
322
323 @given(g_counters())
324 def test_idempotency(self, a: GCounter) -> None:
325 assert a.join(a).equivalent(a)
326
327 @given(g_counters(), g_counters())
328 def test_join_is_monotone(self, a: GCounter, b: GCounter) -> None:
329 """join(a, b).value(agent) >= a.value(agent) for all agents in a."""
330 joined = a.join(b)
331 for agent, count in a.to_dict().items():
332 assert joined.to_dict().get(agent, 0) >= count
333
334
335 # ---------------------------------------------------------------------------
336 # LCS round-trip properties
337 # ---------------------------------------------------------------------------
338
339
340 class TestLCSRoundTrip:
341 """LCS diff produces correct edit scripts for arbitrary sequence pairs."""
342
343 @given(content_id_lists())
344 def test_self_diff_has_no_ops(self, seq: list[str]) -> None:
345 """Diffing a sequence against itself always produces an empty op list."""
346 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
347 non_keep_ops = [op for op in delta["ops"] if op["op"] != "keep"]
348 assert non_keep_ops == [], f"Expected no ops for self-diff, got {non_keep_ops}"
349
350 @given(content_id_lists())
351 def test_self_diff_summary_contains_no_changes(self, seq: list[str]) -> None:
352 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
353 assert "no" in delta["summary"].lower() and "change" in delta["summary"].lower()
354
355 @given(content_id_lists(), content_id_lists())
356 def test_delta_op_count_is_at_most_ses_edit_count(
357 self, base: list[str], target: list[str]
358 ) -> None:
359 """The total number of ops in the delta never exceeds the SES edit count.
360
361 ``detect_moves`` collapses (insert, delete) pairs into single ``MoveOp``\\s,
362 so the op count is always ≤ the raw SES edit count. The SES edit count is
363 the number of insert + delete steps (keep steps are not counted).
364 """
365 steps = myers_ses(base, target)
366 raw_edit_count = sum(1 for s in steps if s.kind != "keep")
367
368 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test")
369 # Each MoveOp replaces 1 insert + 1 delete, so total ops ≤ raw edit count.
370 actual_op_count = len(delta["ops"])
371 assert actual_op_count <= raw_edit_count, (
372 f"Op count {actual_op_count} > SES edit count {raw_edit_count}"
373 )
374
375 @given(content_id_lists(), content_id_lists())
376 def test_diff_ops_content_ids_are_in_base_or_target(
377 self, base: list[str], target: list[str]
378 ) -> None:
379 """Every content_id in the delta ops must come from base or target."""
380 base_set = set(base)
381 target_set = set(target)
382 universe = base_set | target_set
383
384 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test")
385 for op in delta["ops"]:
386 if op["op"] in ("insert", "delete"):
387 assert op["content_id"] in universe, (
388 f"Unknown content_id {op['content_id']!r} in op {op['op']!r}"
389 )
390
391 @given(content_id_lists())
392 def test_empty_base_produces_only_inserts(self, target: list[str]) -> None:
393 """Diffing empty base → target has exactly len(target) insert ops."""
394 delta = lcs_diff(_SEQ_SCHEMA, [], target, domain="test")
395 assert sum(1 for op in delta["ops"] if op["op"] == "insert") == len(target)
396 assert all(op["op"] == "insert" for op in delta["ops"])
397
398 @given(content_id_lists())
399 def test_empty_target_produces_only_deletes(self, base: list[str]) -> None:
400 """Diffing base → empty target has exactly len(base) delete ops."""
401 delta = lcs_diff(_SEQ_SCHEMA, base, [], domain="test")
402 assert sum(1 for op in delta["ops"] if op["op"] == "delete") == len(base)
403 assert all(op["op"] == "delete" for op in delta["ops"])
404
405 @given(content_id_lists(), content_id_lists())
406 def test_domain_tag_propagated(self, base: list[str], target: list[str]) -> None:
407 """The domain tag from the call site is preserved in the StructuredDelta."""
408 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="my-domain")
409 assert delta["domain"] == "my-domain"
410
411 @given(st.lists(st.integers(min_value=0, max_value=5), max_size=6))
412 def test_deduplicated_sequence_round_trips(self, indices: list[int]) -> None:
413 """Converting ints → content IDs and diffing against itself always has no ops."""
414 seq = [f"hash-{i}" for i in indices]
415 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
416 assert not [op for op in delta["ops"] if op["op"] != "keep"]
417
418
419 # ---------------------------------------------------------------------------
420 # OT diamond property
421 # ---------------------------------------------------------------------------
422
423
424 def _apply_insert_ops(base: list[str], ops: list[InsertOp]) -> list[str]:
425 """Apply a list of InsertOps to *base*, returning the resulting list.
426
427 Ops are applied in order; each op's position is relative to the *current*
428 state of the list at the time of application (i.e. positions shift as
429 earlier ops are applied).
430 """
431 result = list(base)
432 for op in ops:
433 pos = op["position"]
434 if pos is None:
435 result.append(op["content_id"])
436 else:
437 result.insert(pos, op["content_id"])
438 return result
439
440
441 @st.composite
442 def _two_commuting_insert_ops(draw: st.DrawFn) -> tuple[InsertOp, InsertOp]:
443 """Strategy: two InsertOps at distinct positions on the same address."""
444 addr = draw(st.text(alphabet="abcde", min_size=1, max_size=4))
445 pos_a = draw(st.integers(min_value=0, max_value=9))
446 # Ensure distinct positions so ops_commute returns True.
447 offset = draw(st.integers(min_value=1, max_value=9))
448 pos_b = pos_a + offset
449 a = InsertOp(
450 op="insert",
451 address=addr,
452 position=pos_a,
453 content_id=draw(_SHORT_TEXT),
454 content_summary="op-a",
455 )
456 b = InsertOp(
457 op="insert",
458 address=addr,
459 position=pos_b,
460 content_id=draw(_SHORT_TEXT),
461 content_summary="op-b",
462 )
463 return a, b
464
465
466 class TestOTDiamondProperty:
467 """``transform`` satisfies the OT diamond property for InsertOp pairs."""
468
469 @given(_two_commuting_insert_ops())
470 def test_insert_insert_diamond(self, ops: tuple[InsertOp, InsertOp]) -> None:
471 """Applying (a then b') == applying (b then a') for concurrent inserts.
472
473 This is the fundamental OT correctness criterion. With a 10-element
474 base, both application orders must produce identical final sequences.
475 """
476 a, b = ops
477 assert ops_commute(a, b), "strategy must only produce commuting pairs"
478
479 a_prime, b_prime = transform(a, b)
480 base: list[str] = [f"x{i}" for i in range(10)]
481
482 # Path 1: apply a first, then the transformed b.
483 path1 = _apply_insert_ops(_apply_insert_ops(base, [a]), [b_prime])
484 # Path 2: apply b first, then the transformed a.
485 path2 = _apply_insert_ops(_apply_insert_ops(base, [b]), [a_prime])
486
487 assert path1 == path2, (
488 f"Diamond property violated:\n"
489 f" a={dict(a)}\n b={dict(b)}\n"
490 f" a'={dict(a_prime)}\n b'={dict(b_prime)}\n"
491 f" path1={path1}\n path2={path2}"
492 )
493
494 @given(_two_commuting_insert_ops())
495 def test_transform_preserves_op_types(self, ops: tuple[InsertOp, InsertOp]) -> None:
496 """transform() must always return InsertOps when given InsertOps."""
497 a, b = ops
498 a_prime, b_prime = transform(a, b)
499 assert a_prime["op"] == "insert"
500 assert b_prime["op"] == "insert"
501
502 @given(_two_commuting_insert_ops())
503 def test_transform_preserves_content(self, ops: tuple[InsertOp, InsertOp]) -> None:
504 """transform() must not alter the content being inserted."""
505 a, b = ops
506 a_prime, b_prime = transform(a, b)
507 assert a_prime["content_id"] == a["content_id"]
508 assert b_prime["content_id"] == b["content_id"]
509
510 @given(_two_commuting_insert_ops())
511 def test_ops_commute_symmetry(self, ops: tuple[InsertOp, InsertOp]) -> None:
512 """ops_commute(a, b) == ops_commute(b, a) — the relation is symmetric."""
513 a, b = ops
514 assert ops_commute(a, b) == ops_commute(b, a)