cgcardona / muse public
test_property_based.py python
515 lines 20.2 KB
d1dfb412 feat: implement 3 missing plan items — property tests, format_version, … Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """Property-based tests using :mod:`hypothesis`.
2
3 Three correctness properties are verified exhaustively over randomly generated
4 inputs — something example-based tests cannot do:
5
6 1. **CRDT lattice laws** — commutativity, associativity, and idempotency hold
7 for all six CRDT types over arbitrary inputs.
8
9 2. **LCS round-trip** — ``diff(a, a)`` always produces zero ops; the number of
10 edits reported equals the true edit distance (inserts + deletes, excluding
11 moves) for arbitrary sequence pairs.
12
13 3. **OT diamond property** — for two concurrently applied ``InsertOp``\\s at
14 distinct positions, ``transform`` produces adjusted ops such that applying
15 them in either order yields the same final sequence.
16
17 Why these three?
18 ----------------
19 - CRDT laws guarantee that any number of agents can merge state in any order
20 and reach the same result. Example-based tests can only verify a handful of
21 cases; hypothesis explores thousands.
22
23 - LCS round-trip safety means the diff engine never produces incorrect deltas
24 for self-diffs (a common pathological case).
25
26 - The OT diamond property is the central correctness criterion for Operational
27 Transformation: two concurrent edits, transformed and applied, must converge.
28 A single counterexample would break real-time collaboration.
29 """
30 from __future__ import annotations
31
32 from typing import TYPE_CHECKING
33
34 import pytest
35 from hypothesis import given, settings
36 from hypothesis import strategies as st
37
38 from muse.core.crdts import (
39 AWMap,
40 GCounter,
41 LWWRegister,
42 ORSet,
43 RGA,
44 VectorClock,
45 )
46 from muse.core.diff_algorithms.lcs import diff as lcs_diff, myers_ses
47 from muse.core.op_transform import ops_commute, transform
48 from muse.core.schema import SequenceSchema
49 from muse.domain import InsertOp
50
51 if TYPE_CHECKING:
52 pass
53
54 # ---------------------------------------------------------------------------
55 # Hypothesis strategies
56 # ---------------------------------------------------------------------------
57
58 # Alphabet for generated string values — small alphabet makes overlaps more
59 # likely, exercising merge paths that would be rare with fully random strings.
60 _ALPHA = st.sampled_from(list("abcde"))
61 _SHORT_TEXT = st.text(alphabet="abcdefghij", min_size=0, max_size=8)
62 _AGENT_ID = st.text(alphabet="abcdefghij", min_size=1, max_size=6)
63 _TIMESTAMP = st.floats(min_value=0.0, max_value=1_000_000.0, allow_nan=False, allow_infinity=False)
64
65
66 @st.composite
67 def lww_registers(draw: st.DrawFn) -> LWWRegister:
68 """Strategy: arbitrary LWWRegister."""
69 return LWWRegister(
70 value=draw(_SHORT_TEXT),
71 timestamp=draw(_TIMESTAMP),
72 author=draw(_AGENT_ID),
73 )
74
75
76 @st.composite
77 def vector_clocks(draw: st.DrawFn) -> VectorClock:
78 """Strategy: VectorClock with 0–4 agents, counts 0–20."""
79 n = draw(st.integers(min_value=0, max_value=4))
80 counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)}
81 return VectorClock(counts)
82
83
84 @st.composite
85 def or_sets(draw: st.DrawFn) -> ORSet:
86 """Strategy: ORSet built by arbitrary add/remove sequences."""
87 elems = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6))
88 s = ORSet()
89 added: list[str] = []
90 for e in elems:
91 s, _ = s.add(e)
92 added.append(e)
93 # Remove a random subset of added elements using observed-token semantics.
94 if added:
95 to_remove = draw(st.lists(st.sampled_from(added), max_size=3, unique=True))
96 for e in to_remove:
97 if e in s.elements():
98 s = s.remove(e, s.tokens_for(e))
99 return s
100
101
102 @st.composite
103 def rgas(draw: st.DrawFn) -> RGA:
104 """Strategy: RGA built by sequential inserts (append-only for simplicity)."""
105 values = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6))
106 rga = RGA()
107 ids: list[str] = []
108 for i, v in enumerate(values):
109 eid = f"{i}@agent"
110 rga = rga.insert(ids[-1] if ids else None, v, element_id=eid)
111 ids.append(eid)
112 # Optionally delete some elements.
113 if ids:
114 to_del = draw(st.lists(st.sampled_from(ids), max_size=2, unique=True))
115 for eid in to_del:
116 rga = rga.delete(eid)
117 return rga
118
119
120 @st.composite
121 def aw_maps(draw: st.DrawFn) -> AWMap:
122 """Strategy: AWMap built by arbitrary set operations."""
123 entries = draw(st.dictionaries(_SHORT_TEXT, _SHORT_TEXT, max_size=4))
124 m = AWMap()
125 for k, v in entries.items():
126 m = m.set(k, v)
127 return m
128
129
130 @st.composite
131 def g_counters(draw: st.DrawFn) -> GCounter:
132 """Strategy: GCounter with 0–4 agents, counts 0–20."""
133 n = draw(st.integers(min_value=0, max_value=4))
134 counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)}
135 return GCounter(counts)
136
137
138 @st.composite
139 def content_id_lists(draw: st.DrawFn) -> list[str]:
140 """Strategy: list of content IDs drawn from a small pool.
141
142 A small pool means pairs of generated lists share elements, exercising
143 the LCS "keep" path rather than always producing pure insert+delete scripts.
144 """
145 pool = [f"h{i}" for i in range(8)] # 8 possible hashes
146 return draw(st.lists(st.sampled_from(pool), min_size=0, max_size=8))
147
148
149 # ---------------------------------------------------------------------------
150 # Shared SequenceSchema for LCS tests
151 # ---------------------------------------------------------------------------
152
153 _SEQ_SCHEMA = SequenceSchema(
154 kind="sequence",
155 element_type="content_id",
156 identity="by_content",
157 diff_algorithm="lcs",
158 order="indexed",
159 )
160
161
162 # ---------------------------------------------------------------------------
163 # CRDT lattice law tests
164 # ---------------------------------------------------------------------------
165
166
167 class TestLWWRegisterLatticeHypothesis:
168 """LWWRegister satisfies all three CRDT lattice laws for arbitrary inputs."""
169
170 @given(lww_registers(), lww_registers())
171 def test_commutativity(self, a: LWWRegister, b: LWWRegister) -> None:
172 assert a.join(b).equivalent(b.join(a))
173
174 @given(lww_registers(), lww_registers(), lww_registers())
175 def test_associativity(self, a: LWWRegister, b: LWWRegister, c: LWWRegister) -> None:
176 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
177
178 @given(lww_registers())
179 def test_idempotency(self, a: LWWRegister) -> None:
180 assert a.join(a).equivalent(a)
181
182 @given(lww_registers(), lww_registers())
183 def test_join_winner_is_higher_timestamp(self, a: LWWRegister, b: LWWRegister) -> None:
184 """The winner's value is always the one with the higher comparison key.
185
186 The key is ``(timestamp, author, value)``; including ``value`` as the
187 final tiebreaker is what makes ``join`` commutative when two writes share
188 the same ``(timestamp, author)`` pair.
189 """
190 result = a.join(b)
191 a_wire = a.to_dict()
192 b_wire = b.to_dict()
193 a_key = (a_wire["timestamp"], a_wire["author"], a_wire["value"])
194 b_key = (b_wire["timestamp"], b_wire["author"], b_wire["value"])
195 if a_key >= b_key:
196 assert result.read() == a.read()
197 else:
198 assert result.read() == b.read()
199
200
201 class TestVectorClockLatticeHypothesis:
202 """VectorClock satisfies all three CRDT lattice laws for arbitrary inputs."""
203
204 @given(vector_clocks(), vector_clocks())
205 def test_commutativity(self, a: VectorClock, b: VectorClock) -> None:
206 assert a.merge(b).equivalent(b.merge(a))
207
208 @given(vector_clocks(), vector_clocks(), vector_clocks())
209 def test_associativity(self, a: VectorClock, b: VectorClock, c: VectorClock) -> None:
210 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
211
212 @given(vector_clocks())
213 def test_idempotency(self, a: VectorClock) -> None:
214 assert a.merge(a).equivalent(a)
215
216 @given(vector_clocks(), vector_clocks())
217 def test_merge_is_pointwise_max(self, a: VectorClock, b: VectorClock) -> None:
218 """Every agent's count in the merged clock equals max(a[agent], b[agent])."""
219 merged = a.merge(b)
220 a_dict = a.to_dict()
221 b_dict = b.to_dict()
222 for agent in set(a_dict) | set(b_dict):
223 expected = max(a_dict.get(agent, 0), b_dict.get(agent, 0))
224 assert merged.to_dict().get(agent, 0) == expected
225
226
227 class TestORSetLatticeHypothesis:
228 """ORSet satisfies all three CRDT lattice laws for arbitrary inputs."""
229
230 @given(or_sets(), or_sets())
231 def test_commutativity(self, a: ORSet, b: ORSet) -> None:
232 assert a.join(b).elements() == b.join(a).elements()
233
234 @given(or_sets(), or_sets(), or_sets())
235 def test_associativity(self, a: ORSet, b: ORSet, c: ORSet) -> None:
236 assert a.join(b).join(c).elements() == a.join(b.join(c)).elements()
237
238 @given(or_sets())
239 def test_idempotency(self, a: ORSet) -> None:
240 assert a.join(a).elements() == a.elements()
241
242 @given(or_sets(), or_sets())
243 def test_join_contains_both_element_sets(self, a: ORSet, b: ORSet) -> None:
244 """join(a, b) must contain every element visible in either a or b."""
245 joined = a.join(b)
246 assert a.elements() <= joined.elements()
247 assert b.elements() <= joined.elements()
248
249
250 class TestRGALatticeHypothesis:
251 """RGA satisfies all three CRDT lattice laws for arbitrary inputs."""
252
253 @given(rgas(), rgas())
254 def test_commutativity(self, a: RGA, b: RGA) -> None:
255 assert a.join(b).equivalent(b.join(a))
256
257 @given(rgas(), rgas(), rgas())
258 def test_associativity(self, a: RGA, b: RGA, c: RGA) -> None:
259 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
260
261 @given(rgas())
262 def test_idempotency(self, a: RGA) -> None:
263 assert a.join(a).equivalent(a)
264
265 @given(rgas(), rgas())
266 def test_join_contains_all_element_ids(self, a: RGA, b: RGA) -> None:
267 """Every element ID known to either replica must appear in the join.
268
269 RGA deletions are *monotone*: once an element is tombstoned in either
270 replica it must remain tombstoned in the join. This means we can only
271 assert that element *IDs* are preserved — not that elements remain
272 visible — because a concurrent delete in the other replica may correctly
273 tombstone them.
274
275 What must hold:
276 - Every ID (visible or tombstoned) from either replica appears in the join.
277 - If an element is tombstoned in either, it is tombstoned in the join.
278 """
279 joined = a.join(b)
280 joined_all = {e["id"]: e for e in joined.to_dict()}
281 a_all = {e["id"]: e for e in a.to_dict()}
282 b_all = {e["id"]: e for e in b.to_dict()}
283
284 # All IDs from both replicas must be present in the join.
285 assert set(a_all) <= set(joined_all), f"IDs from a lost in join: {set(a_all) - set(joined_all)}"
286 assert set(b_all) <= set(joined_all), f"IDs from b lost in join: {set(b_all) - set(joined_all)}"
287
288 # Monotone deletion: if either replica tombstones an element, the join must too.
289 for eid, elem in a_all.items():
290 if elem["deleted"] and eid in joined_all:
291 assert joined_all[eid]["deleted"], f"Element {eid} deleted in a but not in join"
292 for eid, elem in b_all.items():
293 if elem["deleted"] and eid in joined_all:
294 assert joined_all[eid]["deleted"], f"Element {eid} deleted in b but not in join"
295
296
297 class TestAWMapLatticeHypothesis:
298 """AWMap satisfies all three CRDT lattice laws for arbitrary inputs."""
299
300 @given(aw_maps(), aw_maps())
301 def test_commutativity(self, a: AWMap, b: AWMap) -> None:
302 assert a.join(b).to_plain_dict() == b.join(a).to_plain_dict()
303
304 @given(aw_maps(), aw_maps(), aw_maps())
305 def test_associativity(self, a: AWMap, b: AWMap, c: AWMap) -> None:
306 assert a.join(b).join(c).to_plain_dict() == a.join(b.join(c)).to_plain_dict()
307
308 @given(aw_maps())
309 def test_idempotency(self, a: AWMap) -> None:
310 assert a.join(a).to_plain_dict() == a.to_plain_dict()
311
312
313 class TestGCounterLatticeHypothesis:
314 """GCounter satisfies all three CRDT lattice laws for arbitrary inputs."""
315
316 @given(g_counters(), g_counters())
317 def test_commutativity(self, a: GCounter, b: GCounter) -> None:
318 assert a.join(b).equivalent(b.join(a))
319
320 @given(g_counters(), g_counters(), g_counters())
321 def test_associativity(self, a: GCounter, b: GCounter, c: GCounter) -> None:
322 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
323
324 @given(g_counters())
325 def test_idempotency(self, a: GCounter) -> None:
326 assert a.join(a).equivalent(a)
327
328 @given(g_counters(), g_counters())
329 def test_join_is_monotone(self, a: GCounter, b: GCounter) -> None:
330 """join(a, b).value(agent) >= a.value(agent) for all agents in a."""
331 joined = a.join(b)
332 for agent, count in a.to_dict().items():
333 assert joined.to_dict().get(agent, 0) >= count
334
335
336 # ---------------------------------------------------------------------------
337 # LCS round-trip properties
338 # ---------------------------------------------------------------------------
339
340
341 class TestLCSRoundTrip:
342 """LCS diff produces correct edit scripts for arbitrary sequence pairs."""
343
344 @given(content_id_lists())
345 def test_self_diff_has_no_ops(self, seq: list[str]) -> None:
346 """Diffing a sequence against itself always produces an empty op list."""
347 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
348 non_keep_ops = [op for op in delta["ops"] if op["op"] != "keep"]
349 assert non_keep_ops == [], f"Expected no ops for self-diff, got {non_keep_ops}"
350
351 @given(content_id_lists())
352 def test_self_diff_summary_contains_no_changes(self, seq: list[str]) -> None:
353 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
354 assert "no" in delta["summary"].lower() and "change" in delta["summary"].lower()
355
356 @given(content_id_lists(), content_id_lists())
357 def test_delta_op_count_is_at_most_ses_edit_count(
358 self, base: list[str], target: list[str]
359 ) -> None:
360 """The total number of ops in the delta never exceeds the SES edit count.
361
362 ``detect_moves`` collapses (insert, delete) pairs into single ``MoveOp``\\s,
363 so the op count is always ≤ the raw SES edit count. The SES edit count is
364 the number of insert + delete steps (keep steps are not counted).
365 """
366 steps = myers_ses(base, target)
367 raw_edit_count = sum(1 for s in steps if s.kind != "keep")
368
369 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test")
370 # Each MoveOp replaces 1 insert + 1 delete, so total ops ≤ raw edit count.
371 actual_op_count = len(delta["ops"])
372 assert actual_op_count <= raw_edit_count, (
373 f"Op count {actual_op_count} > SES edit count {raw_edit_count}"
374 )
375
376 @given(content_id_lists(), content_id_lists())
377 def test_diff_ops_content_ids_are_in_base_or_target(
378 self, base: list[str], target: list[str]
379 ) -> None:
380 """Every content_id in the delta ops must come from base or target."""
381 base_set = set(base)
382 target_set = set(target)
383 universe = base_set | target_set
384
385 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test")
386 for op in delta["ops"]:
387 if op["op"] in ("insert", "delete"):
388 assert op["content_id"] in universe, (
389 f"Unknown content_id {op['content_id']!r} in op {op['op']!r}"
390 )
391
392 @given(content_id_lists())
393 def test_empty_base_produces_only_inserts(self, target: list[str]) -> None:
394 """Diffing empty base → target has exactly len(target) insert ops."""
395 delta = lcs_diff(_SEQ_SCHEMA, [], target, domain="test")
396 assert sum(1 for op in delta["ops"] if op["op"] == "insert") == len(target)
397 assert all(op["op"] == "insert" for op in delta["ops"])
398
399 @given(content_id_lists())
400 def test_empty_target_produces_only_deletes(self, base: list[str]) -> None:
401 """Diffing base → empty target has exactly len(base) delete ops."""
402 delta = lcs_diff(_SEQ_SCHEMA, base, [], domain="test")
403 assert sum(1 for op in delta["ops"] if op["op"] == "delete") == len(base)
404 assert all(op["op"] == "delete" for op in delta["ops"])
405
406 @given(content_id_lists(), content_id_lists())
407 def test_domain_tag_propagated(self, base: list[str], target: list[str]) -> None:
408 """The domain tag from the call site is preserved in the StructuredDelta."""
409 delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="my-domain")
410 assert delta["domain"] == "my-domain"
411
412 @given(st.lists(st.integers(min_value=0, max_value=5), max_size=6))
413 def test_deduplicated_sequence_round_trips(self, indices: list[int]) -> None:
414 """Converting ints → content IDs and diffing against itself always has no ops."""
415 seq = [f"hash-{i}" for i in indices]
416 delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test")
417 assert not [op for op in delta["ops"] if op["op"] != "keep"]
418
419
420 # ---------------------------------------------------------------------------
421 # OT diamond property
422 # ---------------------------------------------------------------------------
423
424
425 def _apply_insert_ops(base: list[str], ops: list[InsertOp]) -> list[str]:
426 """Apply a list of InsertOps to *base*, returning the resulting list.
427
428 Ops are applied in order; each op's position is relative to the *current*
429 state of the list at the time of application (i.e. positions shift as
430 earlier ops are applied).
431 """
432 result = list(base)
433 for op in ops:
434 pos = op["position"]
435 if pos is None:
436 result.append(op["content_id"])
437 else:
438 result.insert(pos, op["content_id"])
439 return result
440
441
442 @st.composite
443 def _two_commuting_insert_ops(draw: st.DrawFn) -> tuple[InsertOp, InsertOp]:
444 """Strategy: two InsertOps at distinct positions on the same address."""
445 addr = draw(st.text(alphabet="abcde", min_size=1, max_size=4))
446 pos_a = draw(st.integers(min_value=0, max_value=9))
447 # Ensure distinct positions so ops_commute returns True.
448 offset = draw(st.integers(min_value=1, max_value=9))
449 pos_b = pos_a + offset
450 a = InsertOp(
451 op="insert",
452 address=addr,
453 position=pos_a,
454 content_id=draw(_SHORT_TEXT),
455 content_summary="op-a",
456 )
457 b = InsertOp(
458 op="insert",
459 address=addr,
460 position=pos_b,
461 content_id=draw(_SHORT_TEXT),
462 content_summary="op-b",
463 )
464 return a, b
465
466
467 class TestOTDiamondProperty:
468 """``transform`` satisfies the OT diamond property for InsertOp pairs."""
469
470 @given(_two_commuting_insert_ops())
471 def test_insert_insert_diamond(self, ops: tuple[InsertOp, InsertOp]) -> None:
472 """Applying (a then b') == applying (b then a') for concurrent inserts.
473
474 This is the fundamental OT correctness criterion. With a 10-element
475 base, both application orders must produce identical final sequences.
476 """
477 a, b = ops
478 assert ops_commute(a, b), "strategy must only produce commuting pairs"
479
480 a_prime, b_prime = transform(a, b)
481 base: list[str] = [f"x{i}" for i in range(10)]
482
483 # Path 1: apply a first, then the transformed b.
484 path1 = _apply_insert_ops(_apply_insert_ops(base, [a]), [b_prime])
485 # Path 2: apply b first, then the transformed a.
486 path2 = _apply_insert_ops(_apply_insert_ops(base, [b]), [a_prime])
487
488 assert path1 == path2, (
489 f"Diamond property violated:\n"
490 f" a={dict(a)}\n b={dict(b)}\n"
491 f" a'={dict(a_prime)}\n b'={dict(b_prime)}\n"
492 f" path1={path1}\n path2={path2}"
493 )
494
495 @given(_two_commuting_insert_ops())
496 def test_transform_preserves_op_types(self, ops: tuple[InsertOp, InsertOp]) -> None:
497 """transform() must always return InsertOps when given InsertOps."""
498 a, b = ops
499 a_prime, b_prime = transform(a, b)
500 assert a_prime["op"] == "insert"
501 assert b_prime["op"] == "insert"
502
503 @given(_two_commuting_insert_ops())
504 def test_transform_preserves_content(self, ops: tuple[InsertOp, InsertOp]) -> None:
505 """transform() must not alter the content being inserted."""
506 a, b = ops
507 a_prime, b_prime = transform(a, b)
508 assert a_prime["content_id"] == a["content_id"]
509 assert b_prime["content_id"] == b["content_id"]
510
511 @given(_two_commuting_insert_ops())
512 def test_ops_commute_symmetry(self, ops: tuple[InsertOp, InsertOp]) -> None:
513 """ops_commute(a, b) == ops_commute(b, a) — the relation is symmetric."""
514 a, b = ops
515 assert ops_commute(a, b) == ops_commute(b, a)