test_property_based.py
python
| 1 | """Property-based tests using :mod:`hypothesis`. |
| 2 | |
| 3 | Three correctness properties are verified exhaustively over randomly generated |
| 4 | inputs — something example-based tests cannot do: |
| 5 | |
| 6 | 1. **CRDT lattice laws** — commutativity, associativity, and idempotency hold |
| 7 | for all six CRDT types over arbitrary inputs. |
| 8 | |
| 9 | 2. **LCS round-trip** — ``diff(a, a)`` always produces zero ops; the number of |
| 10 | edits reported equals the true edit distance (inserts + deletes, excluding |
| 11 | moves) for arbitrary sequence pairs. |
| 12 | |
| 13 | 3. **OT diamond property** — for two concurrently applied ``InsertOp``\\s at |
| 14 | distinct positions, ``transform`` produces adjusted ops such that applying |
| 15 | them in either order yields the same final sequence. |
| 16 | |
| 17 | Why these three? |
| 18 | ---------------- |
| 19 | - CRDT laws guarantee that any number of agents can merge state in any order |
| 20 | and reach the same result. Example-based tests can only verify a handful of |
| 21 | cases; hypothesis explores thousands. |
| 22 | |
| 23 | - LCS round-trip safety means the diff engine never produces incorrect deltas |
| 24 | for self-diffs (a common pathological case). |
| 25 | |
| 26 | - The OT diamond property is the central correctness criterion for Operational |
| 27 | Transformation: two concurrent edits, transformed and applied, must converge. |
| 28 | A single counterexample would break real-time collaboration. |
| 29 | """ |
| 30 | |
| 31 | from typing import TYPE_CHECKING |
| 32 | |
| 33 | import pytest |
| 34 | from hypothesis import given, settings |
| 35 | from hypothesis import strategies as st |
| 36 | |
| 37 | from muse.core.crdts import ( |
| 38 | AWMap, |
| 39 | GCounter, |
| 40 | LWWRegister, |
| 41 | ORSet, |
| 42 | RGA, |
| 43 | VectorClock, |
| 44 | ) |
| 45 | from muse.core.diff_algorithms.lcs import diff as lcs_diff, myers_ses |
| 46 | from muse.core.op_transform import ops_commute, transform |
| 47 | from muse.core.schema import SequenceSchema |
| 48 | from muse.domain import InsertOp |
| 49 | |
| 50 | if TYPE_CHECKING: |
| 51 | pass |
| 52 | |
| 53 | # --------------------------------------------------------------------------- |
| 54 | # Hypothesis strategies |
| 55 | # --------------------------------------------------------------------------- |
| 56 | |
| 57 | # Alphabet for generated string values — small alphabet makes overlaps more |
| 58 | # likely, exercising merge paths that would be rare with fully random strings. |
| 59 | _ALPHA = st.sampled_from(list("abcde")) |
| 60 | _SHORT_TEXT = st.text(alphabet="abcdefghij", min_size=0, max_size=8) |
| 61 | _AGENT_ID = st.text(alphabet="abcdefghij", min_size=1, max_size=6) |
| 62 | _TIMESTAMP = st.floats(min_value=0.0, max_value=1_000_000.0, allow_nan=False, allow_infinity=False) |
| 63 | |
| 64 | |
| 65 | @st.composite |
| 66 | def lww_registers(draw: st.DrawFn) -> LWWRegister: |
| 67 | """Strategy: arbitrary LWWRegister.""" |
| 68 | return LWWRegister( |
| 69 | value=draw(_SHORT_TEXT), |
| 70 | timestamp=draw(_TIMESTAMP), |
| 71 | author=draw(_AGENT_ID), |
| 72 | ) |
| 73 | |
| 74 | |
| 75 | @st.composite |
| 76 | def vector_clocks(draw: st.DrawFn) -> VectorClock: |
| 77 | """Strategy: VectorClock with 0–4 agents, counts 0–20.""" |
| 78 | n = draw(st.integers(min_value=0, max_value=4)) |
| 79 | counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)} |
| 80 | return VectorClock(counts) |
| 81 | |
| 82 | |
| 83 | @st.composite |
| 84 | def or_sets(draw: st.DrawFn) -> ORSet: |
| 85 | """Strategy: ORSet built by arbitrary add/remove sequences.""" |
| 86 | elems = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6)) |
| 87 | s = ORSet() |
| 88 | added: list[str] = [] |
| 89 | for e in elems: |
| 90 | s, _ = s.add(e) |
| 91 | added.append(e) |
| 92 | # Remove a random subset of added elements using observed-token semantics. |
| 93 | if added: |
| 94 | to_remove = draw(st.lists(st.sampled_from(added), max_size=3, unique=True)) |
| 95 | for e in to_remove: |
| 96 | if e in s.elements(): |
| 97 | s = s.remove(e, s.tokens_for(e)) |
| 98 | return s |
| 99 | |
| 100 | |
| 101 | @st.composite |
| 102 | def rgas(draw: st.DrawFn) -> RGA: |
| 103 | """Strategy: RGA built by sequential inserts (append-only for simplicity).""" |
| 104 | values = draw(st.lists(_SHORT_TEXT, min_size=0, max_size=6)) |
| 105 | rga = RGA() |
| 106 | ids: list[str] = [] |
| 107 | for i, v in enumerate(values): |
| 108 | eid = f"{i}@agent" |
| 109 | rga = rga.insert(ids[-1] if ids else None, v, element_id=eid) |
| 110 | ids.append(eid) |
| 111 | # Optionally delete some elements. |
| 112 | if ids: |
| 113 | to_del = draw(st.lists(st.sampled_from(ids), max_size=2, unique=True)) |
| 114 | for eid in to_del: |
| 115 | rga = rga.delete(eid) |
| 116 | return rga |
| 117 | |
| 118 | |
| 119 | @st.composite |
| 120 | def aw_maps(draw: st.DrawFn) -> AWMap: |
| 121 | """Strategy: AWMap built by arbitrary set operations.""" |
| 122 | entries = draw(st.dictionaries(_SHORT_TEXT, _SHORT_TEXT, max_size=4)) |
| 123 | m = AWMap() |
| 124 | for k, v in entries.items(): |
| 125 | m = m.set(k, v) |
| 126 | return m |
| 127 | |
| 128 | |
| 129 | @st.composite |
| 130 | def g_counters(draw: st.DrawFn) -> GCounter: |
| 131 | """Strategy: GCounter with 0–4 agents, counts 0–20.""" |
| 132 | n = draw(st.integers(min_value=0, max_value=4)) |
| 133 | counts = {f"a{i}": draw(st.integers(min_value=0, max_value=20)) for i in range(n)} |
| 134 | return GCounter(counts) |
| 135 | |
| 136 | |
| 137 | @st.composite |
| 138 | def content_id_lists(draw: st.DrawFn) -> list[str]: |
| 139 | """Strategy: list of content IDs drawn from a small pool. |
| 140 | |
| 141 | A small pool means pairs of generated lists share elements, exercising |
| 142 | the LCS "keep" path rather than always producing pure insert+delete scripts. |
| 143 | """ |
| 144 | pool = [f"h{i}" for i in range(8)] # 8 possible hashes |
| 145 | return draw(st.lists(st.sampled_from(pool), min_size=0, max_size=8)) |
| 146 | |
| 147 | |
| 148 | # --------------------------------------------------------------------------- |
| 149 | # Shared SequenceSchema for LCS tests |
| 150 | # --------------------------------------------------------------------------- |
| 151 | |
| 152 | _SEQ_SCHEMA = SequenceSchema( |
| 153 | kind="sequence", |
| 154 | element_type="content_id", |
| 155 | identity="by_content", |
| 156 | diff_algorithm="lcs", |
| 157 | order="indexed", |
| 158 | ) |
| 159 | |
| 160 | |
| 161 | # --------------------------------------------------------------------------- |
| 162 | # CRDT lattice law tests |
| 163 | # --------------------------------------------------------------------------- |
| 164 | |
| 165 | |
| 166 | class TestLWWRegisterLatticeHypothesis: |
| 167 | """LWWRegister satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 168 | |
| 169 | @given(lww_registers(), lww_registers()) |
| 170 | def test_commutativity(self, a: LWWRegister, b: LWWRegister) -> None: |
| 171 | assert a.join(b).equivalent(b.join(a)) |
| 172 | |
| 173 | @given(lww_registers(), lww_registers(), lww_registers()) |
| 174 | def test_associativity(self, a: LWWRegister, b: LWWRegister, c: LWWRegister) -> None: |
| 175 | assert a.join(b).join(c).equivalent(a.join(b.join(c))) |
| 176 | |
| 177 | @given(lww_registers()) |
| 178 | def test_idempotency(self, a: LWWRegister) -> None: |
| 179 | assert a.join(a).equivalent(a) |
| 180 | |
| 181 | @given(lww_registers(), lww_registers()) |
| 182 | def test_join_winner_is_higher_timestamp(self, a: LWWRegister, b: LWWRegister) -> None: |
| 183 | """The winner's value is always the one with the higher comparison key. |
| 184 | |
| 185 | The key is ``(timestamp, author, value)``; including ``value`` as the |
| 186 | final tiebreaker is what makes ``join`` commutative when two writes share |
| 187 | the same ``(timestamp, author)`` pair. |
| 188 | """ |
| 189 | result = a.join(b) |
| 190 | a_wire = a.to_dict() |
| 191 | b_wire = b.to_dict() |
| 192 | a_key = (a_wire["timestamp"], a_wire["author"], a_wire["value"]) |
| 193 | b_key = (b_wire["timestamp"], b_wire["author"], b_wire["value"]) |
| 194 | if a_key >= b_key: |
| 195 | assert result.read() == a.read() |
| 196 | else: |
| 197 | assert result.read() == b.read() |
| 198 | |
| 199 | |
| 200 | class TestVectorClockLatticeHypothesis: |
| 201 | """VectorClock satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 202 | |
| 203 | @given(vector_clocks(), vector_clocks()) |
| 204 | def test_commutativity(self, a: VectorClock, b: VectorClock) -> None: |
| 205 | assert a.merge(b).equivalent(b.merge(a)) |
| 206 | |
| 207 | @given(vector_clocks(), vector_clocks(), vector_clocks()) |
| 208 | def test_associativity(self, a: VectorClock, b: VectorClock, c: VectorClock) -> None: |
| 209 | assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c))) |
| 210 | |
| 211 | @given(vector_clocks()) |
| 212 | def test_idempotency(self, a: VectorClock) -> None: |
| 213 | assert a.merge(a).equivalent(a) |
| 214 | |
| 215 | @given(vector_clocks(), vector_clocks()) |
| 216 | def test_merge_is_pointwise_max(self, a: VectorClock, b: VectorClock) -> None: |
| 217 | """Every agent's count in the merged clock equals max(a[agent], b[agent]).""" |
| 218 | merged = a.merge(b) |
| 219 | a_dict = a.to_dict() |
| 220 | b_dict = b.to_dict() |
| 221 | for agent in set(a_dict) | set(b_dict): |
| 222 | expected = max(a_dict.get(agent, 0), b_dict.get(agent, 0)) |
| 223 | assert merged.to_dict().get(agent, 0) == expected |
| 224 | |
| 225 | |
| 226 | class TestORSetLatticeHypothesis: |
| 227 | """ORSet satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 228 | |
| 229 | @given(or_sets(), or_sets()) |
| 230 | def test_commutativity(self, a: ORSet, b: ORSet) -> None: |
| 231 | assert a.join(b).elements() == b.join(a).elements() |
| 232 | |
| 233 | @given(or_sets(), or_sets(), or_sets()) |
| 234 | def test_associativity(self, a: ORSet, b: ORSet, c: ORSet) -> None: |
| 235 | assert a.join(b).join(c).elements() == a.join(b.join(c)).elements() |
| 236 | |
| 237 | @given(or_sets()) |
| 238 | def test_idempotency(self, a: ORSet) -> None: |
| 239 | assert a.join(a).elements() == a.elements() |
| 240 | |
| 241 | @given(or_sets(), or_sets()) |
| 242 | def test_join_contains_both_element_sets(self, a: ORSet, b: ORSet) -> None: |
| 243 | """join(a, b) must contain every element visible in either a or b.""" |
| 244 | joined = a.join(b) |
| 245 | assert a.elements() <= joined.elements() |
| 246 | assert b.elements() <= joined.elements() |
| 247 | |
| 248 | |
| 249 | class TestRGALatticeHypothesis: |
| 250 | """RGA satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 251 | |
| 252 | @given(rgas(), rgas()) |
| 253 | def test_commutativity(self, a: RGA, b: RGA) -> None: |
| 254 | assert a.join(b).equivalent(b.join(a)) |
| 255 | |
| 256 | @given(rgas(), rgas(), rgas()) |
| 257 | def test_associativity(self, a: RGA, b: RGA, c: RGA) -> None: |
| 258 | assert a.join(b).join(c).equivalent(a.join(b.join(c))) |
| 259 | |
| 260 | @given(rgas()) |
| 261 | def test_idempotency(self, a: RGA) -> None: |
| 262 | assert a.join(a).equivalent(a) |
| 263 | |
| 264 | @given(rgas(), rgas()) |
| 265 | def test_join_contains_all_element_ids(self, a: RGA, b: RGA) -> None: |
| 266 | """Every element ID known to either replica must appear in the join. |
| 267 | |
| 268 | RGA deletions are *monotone*: once an element is tombstoned in either |
| 269 | replica it must remain tombstoned in the join. This means we can only |
| 270 | assert that element *IDs* are preserved — not that elements remain |
| 271 | visible — because a concurrent delete in the other replica may correctly |
| 272 | tombstone them. |
| 273 | |
| 274 | What must hold: |
| 275 | - Every ID (visible or tombstoned) from either replica appears in the join. |
| 276 | - If an element is tombstoned in either, it is tombstoned in the join. |
| 277 | """ |
| 278 | joined = a.join(b) |
| 279 | joined_all = {e["id"]: e for e in joined.to_dict()} |
| 280 | a_all = {e["id"]: e for e in a.to_dict()} |
| 281 | b_all = {e["id"]: e for e in b.to_dict()} |
| 282 | |
| 283 | # All IDs from both replicas must be present in the join. |
| 284 | assert set(a_all) <= set(joined_all), f"IDs from a lost in join: {set(a_all) - set(joined_all)}" |
| 285 | assert set(b_all) <= set(joined_all), f"IDs from b lost in join: {set(b_all) - set(joined_all)}" |
| 286 | |
| 287 | # Monotone deletion: if either replica tombstones an element, the join must too. |
| 288 | for eid, elem in a_all.items(): |
| 289 | if elem["deleted"] and eid in joined_all: |
| 290 | assert joined_all[eid]["deleted"], f"Element {eid} deleted in a but not in join" |
| 291 | for eid, elem in b_all.items(): |
| 292 | if elem["deleted"] and eid in joined_all: |
| 293 | assert joined_all[eid]["deleted"], f"Element {eid} deleted in b but not in join" |
| 294 | |
| 295 | |
| 296 | class TestAWMapLatticeHypothesis: |
| 297 | """AWMap satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 298 | |
| 299 | @given(aw_maps(), aw_maps()) |
| 300 | def test_commutativity(self, a: AWMap, b: AWMap) -> None: |
| 301 | assert a.join(b).to_plain_dict() == b.join(a).to_plain_dict() |
| 302 | |
| 303 | @given(aw_maps(), aw_maps(), aw_maps()) |
| 304 | def test_associativity(self, a: AWMap, b: AWMap, c: AWMap) -> None: |
| 305 | assert a.join(b).join(c).to_plain_dict() == a.join(b.join(c)).to_plain_dict() |
| 306 | |
| 307 | @given(aw_maps()) |
| 308 | def test_idempotency(self, a: AWMap) -> None: |
| 309 | assert a.join(a).to_plain_dict() == a.to_plain_dict() |
| 310 | |
| 311 | |
| 312 | class TestGCounterLatticeHypothesis: |
| 313 | """GCounter satisfies all three CRDT lattice laws for arbitrary inputs.""" |
| 314 | |
| 315 | @given(g_counters(), g_counters()) |
| 316 | def test_commutativity(self, a: GCounter, b: GCounter) -> None: |
| 317 | assert a.join(b).equivalent(b.join(a)) |
| 318 | |
| 319 | @given(g_counters(), g_counters(), g_counters()) |
| 320 | def test_associativity(self, a: GCounter, b: GCounter, c: GCounter) -> None: |
| 321 | assert a.join(b).join(c).equivalent(a.join(b.join(c))) |
| 322 | |
| 323 | @given(g_counters()) |
| 324 | def test_idempotency(self, a: GCounter) -> None: |
| 325 | assert a.join(a).equivalent(a) |
| 326 | |
| 327 | @given(g_counters(), g_counters()) |
| 328 | def test_join_is_monotone(self, a: GCounter, b: GCounter) -> None: |
| 329 | """join(a, b).value(agent) >= a.value(agent) for all agents in a.""" |
| 330 | joined = a.join(b) |
| 331 | for agent, count in a.to_dict().items(): |
| 332 | assert joined.to_dict().get(agent, 0) >= count |
| 333 | |
| 334 | |
| 335 | # --------------------------------------------------------------------------- |
| 336 | # LCS round-trip properties |
| 337 | # --------------------------------------------------------------------------- |
| 338 | |
| 339 | |
| 340 | class TestLCSRoundTrip: |
| 341 | """LCS diff produces correct edit scripts for arbitrary sequence pairs.""" |
| 342 | |
| 343 | @given(content_id_lists()) |
| 344 | def test_self_diff_has_no_ops(self, seq: list[str]) -> None: |
| 345 | """Diffing a sequence against itself always produces an empty op list.""" |
| 346 | delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test") |
| 347 | non_keep_ops = [op for op in delta["ops"] if op["op"] != "keep"] |
| 348 | assert non_keep_ops == [], f"Expected no ops for self-diff, got {non_keep_ops}" |
| 349 | |
| 350 | @given(content_id_lists()) |
| 351 | def test_self_diff_summary_contains_no_changes(self, seq: list[str]) -> None: |
| 352 | delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test") |
| 353 | assert "no" in delta["summary"].lower() and "change" in delta["summary"].lower() |
| 354 | |
| 355 | @given(content_id_lists(), content_id_lists()) |
| 356 | def test_delta_op_count_is_at_most_ses_edit_count( |
| 357 | self, base: list[str], target: list[str] |
| 358 | ) -> None: |
| 359 | """The total number of ops in the delta never exceeds the SES edit count. |
| 360 | |
| 361 | ``detect_moves`` collapses (insert, delete) pairs into single ``MoveOp``\\s, |
| 362 | so the op count is always ≤ the raw SES edit count. The SES edit count is |
| 363 | the number of insert + delete steps (keep steps are not counted). |
| 364 | """ |
| 365 | steps = myers_ses(base, target) |
| 366 | raw_edit_count = sum(1 for s in steps if s.kind != "keep") |
| 367 | |
| 368 | delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test") |
| 369 | # Each MoveOp replaces 1 insert + 1 delete, so total ops ≤ raw edit count. |
| 370 | actual_op_count = len(delta["ops"]) |
| 371 | assert actual_op_count <= raw_edit_count, ( |
| 372 | f"Op count {actual_op_count} > SES edit count {raw_edit_count}" |
| 373 | ) |
| 374 | |
| 375 | @given(content_id_lists(), content_id_lists()) |
| 376 | def test_diff_ops_content_ids_are_in_base_or_target( |
| 377 | self, base: list[str], target: list[str] |
| 378 | ) -> None: |
| 379 | """Every content_id in the delta ops must come from base or target.""" |
| 380 | base_set = set(base) |
| 381 | target_set = set(target) |
| 382 | universe = base_set | target_set |
| 383 | |
| 384 | delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="test") |
| 385 | for op in delta["ops"]: |
| 386 | if op["op"] in ("insert", "delete"): |
| 387 | assert op["content_id"] in universe, ( |
| 388 | f"Unknown content_id {op['content_id']!r} in op {op['op']!r}" |
| 389 | ) |
| 390 | |
| 391 | @given(content_id_lists()) |
| 392 | def test_empty_base_produces_only_inserts(self, target: list[str]) -> None: |
| 393 | """Diffing empty base → target has exactly len(target) insert ops.""" |
| 394 | delta = lcs_diff(_SEQ_SCHEMA, [], target, domain="test") |
| 395 | assert sum(1 for op in delta["ops"] if op["op"] == "insert") == len(target) |
| 396 | assert all(op["op"] == "insert" for op in delta["ops"]) |
| 397 | |
| 398 | @given(content_id_lists()) |
| 399 | def test_empty_target_produces_only_deletes(self, base: list[str]) -> None: |
| 400 | """Diffing base → empty target has exactly len(base) delete ops.""" |
| 401 | delta = lcs_diff(_SEQ_SCHEMA, base, [], domain="test") |
| 402 | assert sum(1 for op in delta["ops"] if op["op"] == "delete") == len(base) |
| 403 | assert all(op["op"] == "delete" for op in delta["ops"]) |
| 404 | |
| 405 | @given(content_id_lists(), content_id_lists()) |
| 406 | def test_domain_tag_propagated(self, base: list[str], target: list[str]) -> None: |
| 407 | """The domain tag from the call site is preserved in the StructuredDelta.""" |
| 408 | delta = lcs_diff(_SEQ_SCHEMA, base, target, domain="my-domain") |
| 409 | assert delta["domain"] == "my-domain" |
| 410 | |
| 411 | @given(st.lists(st.integers(min_value=0, max_value=5), max_size=6)) |
| 412 | def test_deduplicated_sequence_round_trips(self, indices: list[int]) -> None: |
| 413 | """Converting ints → content IDs and diffing against itself always has no ops.""" |
| 414 | seq = [f"hash-{i}" for i in indices] |
| 415 | delta = lcs_diff(_SEQ_SCHEMA, seq, seq, domain="test") |
| 416 | assert not [op for op in delta["ops"] if op["op"] != "keep"] |
| 417 | |
| 418 | |
| 419 | # --------------------------------------------------------------------------- |
| 420 | # OT diamond property |
| 421 | # --------------------------------------------------------------------------- |
| 422 | |
| 423 | |
| 424 | def _apply_insert_ops(base: list[str], ops: list[InsertOp]) -> list[str]: |
| 425 | """Apply a list of InsertOps to *base*, returning the resulting list. |
| 426 | |
| 427 | Ops are applied in order; each op's position is relative to the *current* |
| 428 | state of the list at the time of application (i.e. positions shift as |
| 429 | earlier ops are applied). |
| 430 | """ |
| 431 | result = list(base) |
| 432 | for op in ops: |
| 433 | pos = op["position"] |
| 434 | if pos is None: |
| 435 | result.append(op["content_id"]) |
| 436 | else: |
| 437 | result.insert(pos, op["content_id"]) |
| 438 | return result |
| 439 | |
| 440 | |
| 441 | @st.composite |
| 442 | def _two_commuting_insert_ops(draw: st.DrawFn) -> tuple[InsertOp, InsertOp]: |
| 443 | """Strategy: two InsertOps at distinct positions on the same address.""" |
| 444 | addr = draw(st.text(alphabet="abcde", min_size=1, max_size=4)) |
| 445 | pos_a = draw(st.integers(min_value=0, max_value=9)) |
| 446 | # Ensure distinct positions so ops_commute returns True. |
| 447 | offset = draw(st.integers(min_value=1, max_value=9)) |
| 448 | pos_b = pos_a + offset |
| 449 | a = InsertOp( |
| 450 | op="insert", |
| 451 | address=addr, |
| 452 | position=pos_a, |
| 453 | content_id=draw(_SHORT_TEXT), |
| 454 | content_summary="op-a", |
| 455 | ) |
| 456 | b = InsertOp( |
| 457 | op="insert", |
| 458 | address=addr, |
| 459 | position=pos_b, |
| 460 | content_id=draw(_SHORT_TEXT), |
| 461 | content_summary="op-b", |
| 462 | ) |
| 463 | return a, b |
| 464 | |
| 465 | |
| 466 | class TestOTDiamondProperty: |
| 467 | """``transform`` satisfies the OT diamond property for InsertOp pairs.""" |
| 468 | |
| 469 | @given(_two_commuting_insert_ops()) |
| 470 | def test_insert_insert_diamond(self, ops: tuple[InsertOp, InsertOp]) -> None: |
| 471 | """Applying (a then b') == applying (b then a') for concurrent inserts. |
| 472 | |
| 473 | This is the fundamental OT correctness criterion. With a 10-element |
| 474 | base, both application orders must produce identical final sequences. |
| 475 | """ |
| 476 | a, b = ops |
| 477 | assert ops_commute(a, b), "strategy must only produce commuting pairs" |
| 478 | |
| 479 | a_prime, b_prime = transform(a, b) |
| 480 | base: list[str] = [f"x{i}" for i in range(10)] |
| 481 | |
| 482 | # Path 1: apply a first, then the transformed b. |
| 483 | path1 = _apply_insert_ops(_apply_insert_ops(base, [a]), [b_prime]) |
| 484 | # Path 2: apply b first, then the transformed a. |
| 485 | path2 = _apply_insert_ops(_apply_insert_ops(base, [b]), [a_prime]) |
| 486 | |
| 487 | assert path1 == path2, ( |
| 488 | f"Diamond property violated:\n" |
| 489 | f" a={dict(a)}\n b={dict(b)}\n" |
| 490 | f" a'={dict(a_prime)}\n b'={dict(b_prime)}\n" |
| 491 | f" path1={path1}\n path2={path2}" |
| 492 | ) |
| 493 | |
| 494 | @given(_two_commuting_insert_ops()) |
| 495 | def test_transform_preserves_op_types(self, ops: tuple[InsertOp, InsertOp]) -> None: |
| 496 | """transform() must always return InsertOps when given InsertOps.""" |
| 497 | a, b = ops |
| 498 | a_prime, b_prime = transform(a, b) |
| 499 | assert a_prime["op"] == "insert" |
| 500 | assert b_prime["op"] == "insert" |
| 501 | |
| 502 | @given(_two_commuting_insert_ops()) |
| 503 | def test_transform_preserves_content(self, ops: tuple[InsertOp, InsertOp]) -> None: |
| 504 | """transform() must not alter the content being inserted.""" |
| 505 | a, b = ops |
| 506 | a_prime, b_prime = transform(a, b) |
| 507 | assert a_prime["content_id"] == a["content_id"] |
| 508 | assert b_prime["content_id"] == b["content_id"] |
| 509 | |
| 510 | @given(_two_commuting_insert_ops()) |
| 511 | def test_ops_commute_symmetry(self, ops: tuple[InsertOp, InsertOp]) -> None: |
| 512 | """ops_commute(a, b) == ops_commute(b, a) — the relation is symmetric.""" |
| 513 | a, b = ops |
| 514 | assert ops_commute(a, b) == ops_commute(b, a) |