cgcardona / muse public
test_crdts.py python
832 lines 28.3 KB
d855b718 refactor: strip phase/v2 workflow labels from all source, tests, and docs Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Comprehensive test suite for the CRDT primitive library.
2
3 Tests cover all six CRDT types:
4 - :class:`~muse.core.crdts.vclock.VectorClock`
5 - :class:`~muse.core.crdts.lww_register.LWWRegister`
6 - :class:`~muse.core.crdts.or_set.ORSet`
7 - :class:`~muse.core.crdts.rga.RGA`
8 - :class:`~muse.core.crdts.aw_map.AWMap`
9 - :class:`~muse.core.crdts.g_counter.GCounter`
10
11 Each type is tested for:
12 1. Basic operational correctness.
13 2. All three CRDT lattice laws: commutativity, associativity, idempotency.
14 3. Serialisation round-trip (to_dict / from_dict).
15 4. Edge cases (empty structures, concurrent writes, tombstone correctness).
16
17 Additionally, :func:`~muse.core.merge_engine.crdt_join_snapshots` is tested
18 for the integration path through the merge engine.
19 """
20
21 from __future__ import annotations
22
23 import pathlib
24
25 import pytest
26
27 from muse.domain import CRDTPlugin
28 from muse.core.crdts import (
29 AWMap,
30 GCounter,
31 LWWRegister,
32 ORSet,
33 RGA,
34 VectorClock,
35 )
36 from muse.core.crdts.lww_register import LWWValue
37 from muse.core.crdts.or_set import ORSetDict
38 from muse.core.crdts.rga import RGAElement
39
40
41 # ===========================================================================
42 # VectorClock
43 # ===========================================================================
44
45
46 class TestVectorClock:
47 def test_increment_own_agent(self) -> None:
48 vc = VectorClock()
49 vc2 = vc.increment("agent-A")
50 assert vc2.to_dict() == {"agent-A": 1}
51
52 def test_increment_twice(self) -> None:
53 vc = VectorClock().increment("agent-A").increment("agent-A")
54 assert vc.to_dict()["agent-A"] == 2
55
56 def test_merge_takes_max_per_agent(self) -> None:
57 a = VectorClock({"agent-A": 3, "agent-B": 1})
58 b = VectorClock({"agent-A": 1, "agent-B": 5, "agent-C": 2})
59 merged = a.merge(b)
60 assert merged.to_dict() == {"agent-A": 3, "agent-B": 5, "agent-C": 2}
61
62 def test_happens_before_simple(self) -> None:
63 a = VectorClock({"agent-A": 1})
64 b = VectorClock({"agent-A": 2})
65 assert a.happens_before(b)
66 assert not b.happens_before(a)
67
68 def test_happens_before_multi_agent(self) -> None:
69 a = VectorClock({"agent-A": 1, "agent-B": 2})
70 b = VectorClock({"agent-A": 2, "agent-B": 3})
71 assert a.happens_before(b)
72
73 def test_not_happens_before_concurrent(self) -> None:
74 a = VectorClock({"agent-A": 2, "agent-B": 1})
75 b = VectorClock({"agent-A": 1, "agent-B": 2})
76 assert not a.happens_before(b)
77 assert not b.happens_before(a)
78
79 def test_concurrent_with_neither_dominates(self) -> None:
80 a = VectorClock({"agent-A": 2, "agent-B": 1})
81 b = VectorClock({"agent-A": 1, "agent-B": 2})
82 assert a.concurrent_with(b)
83 assert b.concurrent_with(a)
84
85 def test_not_concurrent_with_itself(self) -> None:
86 a = VectorClock({"agent-A": 1})
87 assert not a.concurrent_with(a)
88
89 def test_idempotent_merge(self) -> None:
90 a = VectorClock({"agent-A": 3, "agent-B": 1})
91 assert a.merge(a).equivalent(a)
92
93 def test_merge_commutativity(self) -> None:
94 a = VectorClock({"agent-A": 3, "agent-B": 1})
95 b = VectorClock({"agent-A": 1, "agent-B": 5})
96 assert a.merge(b).equivalent(b.merge(a))
97
98 def test_merge_associativity(self) -> None:
99 a = VectorClock({"agent-A": 1})
100 b = VectorClock({"agent-B": 2})
101 c = VectorClock({"agent-C": 3})
102 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
103
104 def test_round_trip_to_from_dict(self) -> None:
105 vc = VectorClock({"agent-A": 5, "agent-B": 3})
106 assert VectorClock.from_dict(vc.to_dict()).equivalent(vc)
107
108 def test_empty_clock_happens_before_non_empty(self) -> None:
109 empty = VectorClock()
110 non_empty = VectorClock({"agent-A": 1})
111 assert empty.happens_before(non_empty)
112
113 def test_equal_clocks_not_happens_before(self) -> None:
114 a = VectorClock({"agent-A": 1})
115 b = VectorClock({"agent-A": 1})
116 assert not a.happens_before(b)
117 assert not b.happens_before(a)
118
119
120 # ===========================================================================
121 # LWWRegister
122 # ===========================================================================
123
124
125 class TestLWWRegister:
126 def _make(self, value: str, ts: float, author: str) -> LWWRegister:
127 data: LWWValue = {"value": value, "timestamp": ts, "author": author}
128 return LWWRegister.from_dict(data)
129
130 def test_read_returns_value(self) -> None:
131 r = self._make("C major", 1.0, "agent-1")
132 assert r.read() == "C major"
133
134 def test_lww_later_timestamp_wins(self) -> None:
135 a = self._make("C major", 1.0, "agent-1")
136 b = self._make("G major", 2.0, "agent-2")
137 assert a.join(b).read() == "G major"
138 assert b.join(a).read() == "G major"
139
140 def test_lww_same_timestamp_author_tiebreak(self) -> None:
141 # Lexicographically larger author wins
142 a = self._make("C major", 1.0, "agent-A")
143 b = self._make("G major", 1.0, "agent-B")
144 # "agent-B" > "agent-A" lexicographically
145 result = a.join(b)
146 assert result.read() == "G major"
147 result2 = b.join(a)
148 assert result2.read() == "G major"
149
150 def test_join_is_commutative(self) -> None:
151 a = self._make("C major", 1.0, "agent-1")
152 b = self._make("G major", 2.0, "agent-2")
153 assert a.join(b).equivalent(b.join(a))
154
155 def test_join_is_associative(self) -> None:
156 a = self._make("C major", 1.0, "agent-1")
157 b = self._make("G major", 2.0, "agent-2")
158 c = self._make("D minor", 3.0, "agent-3")
159 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
160
161 def test_join_is_idempotent(self) -> None:
162 a = self._make("C major", 1.0, "agent-1")
163 assert a.join(a).equivalent(a)
164
165 def test_write_returns_winner(self) -> None:
166 r = self._make("C major", 5.0, "agent-1")
167 r2 = r.write("G major", 3.0, "agent-2")
168 # older write loses
169 assert r2.read() == "C major"
170
171 def test_round_trip_to_from_dict(self) -> None:
172 r = self._make("A minor", 42.0, "agent-x")
173 assert LWWRegister.from_dict(r.to_dict()).equivalent(r)
174
175
176 # ===========================================================================
177 # ORSet
178 # ===========================================================================
179
180
181 class TestORSet:
182 def test_add_element(self) -> None:
183 s = ORSet()
184 s, _ = s.add("note-A")
185 assert "note-A" in s
186
187 def test_remove_element(self) -> None:
188 s = ORSet()
189 s, tok = s.add("note-A")
190 s = s.remove("note-A", {tok})
191 assert "note-A" not in s
192
193 def test_add_survives_concurrent_remove(self) -> None:
194 # Agent 1 adds note-A with token_1
195 s1 = ORSet()
196 s1, tok1 = s1.add("note-A")
197
198 # Agent 2 removes note-A by tombstoning token_1
199 s2 = ORSet()
200 s2, _ = s2.add("note-A") # s2 adds with its own token
201 s2 = s2.remove("note-A", {tok1}) # removes agent-1's token only
202
203 # Agent 1 concurrently adds note-A again with token_2 (new token, survives)
204 s1_v2, tok2 = s1.add("note-A")
205
206 # Merge: agent-1's new add survives agent-2's remove of old token
207 merged = s1_v2.join(s2)
208 assert "note-A" in merged
209
210 def test_remove_observed_element_works(self) -> None:
211 s = ORSet()
212 s, tok = s.add("note-B")
213 tokens = s.tokens_for("note-B")
214 s = s.remove("note-B", tokens)
215 assert "note-B" not in s
216
217 def test_join_is_commutative(self) -> None:
218 s1 = ORSet()
219 s1, _ = s1.add("X")
220
221 s2 = ORSet()
222 s2, _ = s2.add("Y")
223
224 assert s1.join(s2).elements() == s2.join(s1).elements()
225
226 def test_join_is_associative(self) -> None:
227 s1 = ORSet()
228 s1, _ = s1.add("X")
229 s2 = ORSet()
230 s2, _ = s2.add("Y")
231 s3 = ORSet()
232 s3, _ = s3.add("Z")
233
234 left = s1.join(s2).join(s3)
235 right = s1.join(s2.join(s3))
236 assert left.elements() == right.elements()
237
238 def test_join_is_idempotent(self) -> None:
239 s = ORSet()
240 s, _ = s.add("X")
241 assert s.join(s).elements() == s.elements()
242
243 def test_tokens_for_returns_live_tokens(self) -> None:
244 s = ORSet()
245 s, tok = s.add("X")
246 assert tok in s.tokens_for("X")
247
248 def test_contains_dunder(self) -> None:
249 s = ORSet()
250 s, _ = s.add("Z")
251 assert "Z" in s
252 assert "W" not in s
253
254 def test_round_trip_to_from_dict(self) -> None:
255 s = ORSet()
256 s, _ = s.add("A")
257 s, _ = s.add("B")
258 data: ORSetDict = s.to_dict()
259 s2 = ORSet.from_dict(data)
260 assert s2.elements() == s.elements()
261
262 def test_add_multiple_same_element(self) -> None:
263 s = ORSet()
264 s, tok1 = s.add("X")
265 s, tok2 = s.add("X")
266 # Both tokens are live
267 assert tok1 in s.tokens_for("X")
268 assert tok2 in s.tokens_for("X")
269
270
271 # ===========================================================================
272 # RGA
273 # ===========================================================================
274
275
276 class TestRGA:
277 def test_insert_after_none_is_prepend(self) -> None:
278 rga = RGA()
279 rga = rga.insert(None, "a", element_id="1@agent")
280 assert rga.to_sequence() == ["a"]
281
282 def test_insert_at_end(self) -> None:
283 id_a = "1@agent"
284 rga = RGA()
285 rga = rga.insert(None, "a", element_id=id_a)
286 rga = rga.insert(id_a, "b", element_id="2@agent")
287 assert rga.to_sequence() == ["a", "b"]
288
289 def test_insert_in_middle(self) -> None:
290 # In RGA, more-recently-inserted elements (larger ID) at the same anchor
291 # appear to the LEFT of earlier-inserted elements. So insert "c" first
292 # with a smaller ID, then insert "b" with a larger ID at the same anchor
293 # to get b appearing before c in the visible sequence.
294 id_a = "1@agent"
295 rga = RGA()
296 rga = rga.insert(None, "a", element_id=id_a)
297 rga = rga.insert(id_a, "c", element_id="2@agent") # inserted first → smaller ID
298 rga = rga.insert(id_a, "b", element_id="3@agent") # inserted second → larger ID → goes left
299 assert rga.to_sequence() == ["a", "b", "c"]
300
301 def test_delete_marks_tombstone(self) -> None:
302 id_a = "1@agent"
303 rga = RGA()
304 rga = rga.insert(None, "a", element_id=id_a)
305 rga = rga.delete(id_a)
306 assert rga.to_sequence() == []
307
308 def test_delete_unknown_id_is_noop(self) -> None:
309 rga = RGA()
310 rga = rga.insert(None, "a", element_id="1@agent")
311 rga2 = rga.delete("nonexistent-id")
312 assert rga2.to_sequence() == ["a"]
313
314 def test_concurrent_insert_same_position_deterministic(self) -> None:
315 # Two agents both insert after id_a; larger ID goes first
316 id_a = "1@agent"
317 rga = RGA()
318 rga = rga.insert(None, "a", element_id=id_a)
319
320 rga_agent1 = rga.insert(id_a, "B", element_id="2@agent-z") # larger ID
321 rga_agent2 = rga.insert(id_a, "C", element_id="2@agent-a") # smaller ID
322
323 merged_1_then_2 = rga_agent1.join(rga_agent2)
324 merged_2_then_1 = rga_agent2.join(rga_agent1)
325
326 # Both orderings must produce the same sequence (commutativity)
327 assert merged_1_then_2.to_sequence() == merged_2_then_1.to_sequence()
328
329 def test_join_is_commutative(self) -> None:
330 rga1 = RGA()
331 rga1 = rga1.insert(None, "X", element_id="1@a")
332
333 rga2 = RGA()
334 rga2 = rga2.insert(None, "Y", element_id="1@b")
335
336 assert rga1.join(rga2).to_sequence() == rga2.join(rga1).to_sequence()
337
338 def test_join_is_associative(self) -> None:
339 rga1 = RGA()
340 rga1 = rga1.insert(None, "X", element_id="1@a")
341
342 rga2 = RGA()
343 rga2 = rga2.insert(None, "Y", element_id="1@b")
344
345 rga3 = RGA()
346 rga3 = rga3.insert(None, "Z", element_id="1@c")
347
348 left = rga1.join(rga2).join(rga3)
349 right = rga1.join(rga2.join(rga3))
350 assert left.to_sequence() == right.to_sequence()
351
352 def test_join_is_idempotent(self) -> None:
353 rga = RGA()
354 rga = rga.insert(None, "X", element_id="1@agent")
355 assert rga.join(rga).to_sequence() == rga.to_sequence()
356
357 def test_to_sequence_excludes_tombstones(self) -> None:
358 id1 = "1@agent"
359 rga = RGA()
360 rga = rga.insert(None, "A", element_id=id1)
361 rga = rga.insert(id1, "B", element_id="2@agent")
362 rga = rga.delete(id1)
363 assert rga.to_sequence() == ["B"]
364
365 def test_rga_round_trip_to_from_dict(self) -> None:
366 id1 = "1@agent"
367 rga = RGA()
368 rga = rga.insert(None, "A", element_id=id1)
369 rga = rga.insert(id1, "B", element_id="2@agent")
370 data: list[RGAElement] = rga.to_dict()
371 rga2 = RGA.from_dict(data)
372 assert rga2.to_sequence() == rga.to_sequence()
373
374 def test_len_excludes_tombstones(self) -> None:
375 id1 = "1@agent"
376 rga = RGA()
377 rga = rga.insert(None, "A", element_id=id1)
378 rga = rga.insert(id1, "B", element_id="2@agent")
379 rga = rga.delete(id1)
380 assert len(rga) == 1
381
382 def test_tombstone_survives_join(self) -> None:
383 id1 = "1@agent"
384 rga1 = RGA()
385 rga1 = rga1.insert(None, "A", element_id=id1)
386
387 rga2 = rga1.delete(id1)
388
389 merged = rga1.join(rga2)
390 # Once deleted in either replica, stays deleted
391 assert "A" not in merged.to_sequence()
392
393
394 # ===========================================================================
395 # AWMap
396 # ===========================================================================
397
398
399 class TestAWMap:
400 def test_set_and_get(self) -> None:
401 m = AWMap()
402 m = m.set("tempo", "120bpm")
403 assert m.get("tempo") == "120bpm"
404
405 def test_get_absent_returns_none(self) -> None:
406 m = AWMap()
407 assert m.get("nonexistent") is None
408
409 def test_overwrite_key(self) -> None:
410 m = AWMap()
411 m = m.set("key", "C major")
412 m = m.set("key", "G major")
413 assert m.get("key") == "G major"
414
415 def test_remove_key(self) -> None:
416 m = AWMap()
417 m = m.set("tempo", "120bpm")
418 m = m.remove("tempo")
419 assert m.get("tempo") is None
420
421 def test_add_wins_concurrent_remove(self) -> None:
422 # Agent A sets "tempo"
423 m1 = AWMap()
424 m1 = m1.set("tempo", "120bpm")
425
426 # Agent B removes "tempo" by tombstoning its tokens
427 m2 = AWMap()
428 m2 = m2.set("tempo", "120bpm")
429 m2 = m2.remove("tempo")
430
431 # Agent A concurrently adds a new value (new token)
432 m1_v2 = m1.set("tempo", "140bpm")
433
434 # Merge: the new add survives because it has a fresh token
435 merged = m1_v2.join(m2)
436 assert merged.get("tempo") == "140bpm"
437
438 def test_join_is_commutative(self) -> None:
439 m1 = AWMap()
440 m1 = m1.set("A", "1")
441 m2 = AWMap()
442 m2 = m2.set("B", "2")
443 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
444
445 def test_join_is_associative(self) -> None:
446 m1 = AWMap().set("A", "1")
447 m2 = AWMap().set("B", "2")
448 m3 = AWMap().set("C", "3")
449 left = m1.join(m2).join(m3)
450 right = m1.join(m2.join(m3))
451 assert left.to_plain_dict() == right.to_plain_dict()
452
453 def test_join_is_idempotent(self) -> None:
454 m = AWMap().set("A", "1")
455 assert m.join(m).to_plain_dict() == m.to_plain_dict()
456
457 def test_keys_returns_live_keys(self) -> None:
458 m = AWMap()
459 m = m.set("X", "1")
460 m = m.set("Y", "2")
461 assert m.keys() == frozenset({"X", "Y"})
462
463 def test_contains(self) -> None:
464 m = AWMap().set("K", "V")
465 assert "K" in m
466 assert "Z" not in m
467
468 def test_round_trip_to_from_dict(self) -> None:
469 m = AWMap().set("A", "1").set("B", "2")
470 m2 = AWMap.from_dict(m.to_dict())
471 assert m2.to_plain_dict() == m.to_plain_dict()
472
473
474 # ===========================================================================
475 # GCounter
476 # ===========================================================================
477
478
479 class TestGCounter:
480 def test_initial_value_is_zero(self) -> None:
481 c = GCounter()
482 assert c.value() == 0
483
484 def test_increment(self) -> None:
485 c = GCounter().increment("agent-1")
486 assert c.value() == 1
487 assert c.value_for("agent-1") == 1
488
489 def test_increment_by_n(self) -> None:
490 c = GCounter().increment("agent-1", by=5)
491 assert c.value() == 5
492
493 def test_increment_rejects_zero(self) -> None:
494 with pytest.raises(ValueError):
495 GCounter().increment("agent-1", by=0)
496
497 def test_increment_rejects_negative(self) -> None:
498 with pytest.raises(ValueError):
499 GCounter().increment("agent-1", by=-1)
500
501 def test_join_takes_max_per_agent(self) -> None:
502 c1 = GCounter({"agent-A": 3, "agent-B": 1})
503 c2 = GCounter({"agent-A": 1, "agent-B": 5})
504 merged = c1.join(c2)
505 assert merged.value_for("agent-A") == 3
506 assert merged.value_for("agent-B") == 5
507 assert merged.value() == 8
508
509 def test_join_is_commutative(self) -> None:
510 c1 = GCounter({"agent-A": 3})
511 c2 = GCounter({"agent-B": 7})
512 assert c1.join(c2).equivalent(c2.join(c1))
513
514 def test_join_is_associative(self) -> None:
515 c1 = GCounter({"agent-A": 1})
516 c2 = GCounter({"agent-B": 2})
517 c3 = GCounter({"agent-C": 3})
518 assert c1.join(c2).join(c3).equivalent(c1.join(c2.join(c3)))
519
520 def test_join_is_idempotent(self) -> None:
521 c = GCounter({"agent-A": 5})
522 assert c.join(c).equivalent(c)
523
524 def test_value_for_absent_agent_is_zero(self) -> None:
525 c = GCounter()
526 assert c.value_for("ghost-agent") == 0
527
528 def test_round_trip_to_from_dict(self) -> None:
529 c = GCounter({"a": 1, "b": 2})
530 c2 = GCounter.from_dict(c.to_dict())
531 assert c2.equivalent(c)
532
533
534 # ===========================================================================
535 # CRDTPlugin integration — merge engine CRDT path
536 # ===========================================================================
537
538
539 class TestCRDTMergeEngineIntegration:
540 """Tests for :func:`~muse.core.merge_engine.crdt_join_snapshots`.
541
542 Since there is no production CRDTPlugin implementation yet (the music plugin
543 is still three-way mode), we create a minimal stub that satisfies the
544 CRDTPlugin protocol.
545 """
546
547 def _make_stub_plugin(self) -> CRDTPlugin:
548 """Return a minimal CRDTPlugin stub."""
549 from muse.domain import (
550 CRDTPlugin,
551 CRDTSnapshotManifest,
552 DriftReport,
553 LiveState,
554 MergeResult,
555 StateSnapshot,
556 StateDelta,
557 StructuredDelta,
558 )
559 from muse.core.schema import CRDTDimensionSpec, DomainSchema
560
561 class StubCRDTPlugin(CRDTPlugin):
562 def snapshot(self, live_state: LiveState) -> StateSnapshot:
563 return {"files": {}, "domain": "stub"}
564
565 def diff(
566 self,
567 base: StateSnapshot,
568 target: StateSnapshot,
569 *,
570 repo_root: pathlib.Path | None = None,
571 ) -> StateDelta:
572 empty_delta: StructuredDelta = {
573 "domain": "stub",
574 "ops": [],
575 "summary": "no changes",
576 }
577 return empty_delta
578
579 def drift(self, committed: StateSnapshot, live_state: LiveState) -> DriftReport:
580 return DriftReport(has_drift=False)
581
582 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState:
583 return live_state
584
585 def merge(
586 self,
587 base: StateSnapshot,
588 left: StateSnapshot,
589 right: StateSnapshot,
590 *,
591 repo_root: pathlib.Path | None = None,
592 ) -> MergeResult:
593 return MergeResult(merged=base)
594
595 def schema(self) -> DomainSchema:
596 from muse.core.schema import SetSchema
597 schema: SetSchema = {
598 "kind": "set",
599 "element_type": "str",
600 "identity": "by_content",
601 }
602 return {
603 "domain": "stub",
604 "description": "Stub CRDT domain",
605 "dimensions": [],
606 "top_level": schema,
607 "merge_mode": "crdt",
608 "schema_version": 1,
609 }
610
611 def crdt_schema(self) -> list[CRDTDimensionSpec]:
612 return []
613
614 def join(
615 self,
616 a: CRDTSnapshotManifest,
617 b: CRDTSnapshotManifest,
618 ) -> CRDTSnapshotManifest:
619 # Simple merge: union of files, max vclock, union crdt_state
620 from muse.core.crdts.vclock import VectorClock
621
622 vc_a = VectorClock.from_dict(a["vclock"])
623 vc_b = VectorClock.from_dict(b["vclock"])
624 merged_vc = vc_a.merge(vc_b)
625 merged_files = {**a["files"], **b["files"]}
626 merged_crdt_state = {**a["crdt_state"], **b["crdt_state"]}
627 result: CRDTSnapshotManifest = {
628 "files": merged_files,
629 "domain": a["domain"],
630 "vclock": merged_vc.to_dict(),
631 "crdt_state": merged_crdt_state,
632 "schema_version": 1,
633 }
634 return result
635
636 def to_crdt_state(self, snapshot: StateSnapshot) -> CRDTSnapshotManifest:
637 result: CRDTSnapshotManifest = {
638 "files": snapshot.get("files", {}),
639 "domain": snapshot.get("domain", "stub"),
640 "vclock": {},
641 "crdt_state": {},
642 "schema_version": 1,
643 }
644 return result
645
646 def from_crdt_state(self, crdt: CRDTSnapshotManifest) -> StateSnapshot:
647 return {"files": crdt["files"], "domain": crdt["domain"]}
648
649 return StubCRDTPlugin()
650
651 def test_crdt_join_produces_merge_result(self) -> None:
652 from muse.core.merge_engine import crdt_join_snapshots
653
654 plugin = self._make_stub_plugin()
655 result = crdt_join_snapshots(
656 plugin=plugin,
657 a_snapshot={"track.mid": "hash-a"},
658 b_snapshot={"beat.mid": "hash-b"},
659 a_vclock={"agent-1": 1},
660 b_vclock={"agent-2": 1},
661 a_crdt_state={},
662 b_crdt_state={},
663 domain="stub",
664 )
665 assert result.is_clean
666 assert result.conflicts == []
667
668 def test_crdt_join_merges_files(self) -> None:
669 from muse.core.merge_engine import crdt_join_snapshots
670
671 plugin = self._make_stub_plugin()
672 result = crdt_join_snapshots(
673 plugin=plugin,
674 a_snapshot={"track.mid": "hash-a"},
675 b_snapshot={"beat.mid": "hash-b"},
676 a_vclock={},
677 b_vclock={},
678 a_crdt_state={},
679 b_crdt_state={},
680 domain="stub",
681 )
682 assert "track.mid" in result.merged["files"]
683 assert "beat.mid" in result.merged["files"]
684
685 def test_crdt_join_is_commutative(self) -> None:
686 from muse.core.merge_engine import crdt_join_snapshots
687
688 plugin = self._make_stub_plugin()
689 result_ab = crdt_join_snapshots(
690 plugin=plugin,
691 a_snapshot={"track.mid": "hash-a"},
692 b_snapshot={"beat.mid": "hash-b"},
693 a_vclock={"agent-1": 1},
694 b_vclock={"agent-2": 2},
695 a_crdt_state={},
696 b_crdt_state={},
697 domain="stub",
698 )
699 result_ba = crdt_join_snapshots(
700 plugin=plugin,
701 a_snapshot={"beat.mid": "hash-b"},
702 b_snapshot={"track.mid": "hash-a"},
703 a_vclock={"agent-2": 2},
704 b_vclock={"agent-1": 1},
705 a_crdt_state={},
706 b_crdt_state={},
707 domain="stub",
708 )
709 assert set(result_ab.merged["files"].keys()) == set(result_ba.merged["files"].keys())
710
711 def test_crdt_merge_never_produces_conflicts(self) -> None:
712 from muse.core.merge_engine import crdt_join_snapshots
713
714 plugin = self._make_stub_plugin()
715 # Even when both replicas modify the same file, CRDT join never conflicts
716 result = crdt_join_snapshots(
717 plugin=plugin,
718 a_snapshot={"shared.mid": "hash-a"},
719 b_snapshot={"shared.mid": "hash-b"},
720 a_vclock={"agent-1": 1},
721 b_vclock={"agent-2": 1},
722 a_crdt_state={},
723 b_crdt_state={},
724 domain="stub",
725 )
726 assert result.is_clean
727 assert len(result.conflicts) == 0
728
729 def test_crdt_join_requires_crdt_plugin_protocol(self) -> None:
730 """Verify the protocol check is documented in the function signature.
731
732 The static type of ``crdt_join_snapshots(plugin=...)`` is
733 ``MuseDomainPlugin``. Callers that don't implement ``CRDTPlugin``
734 are rejected at the call site by mypy. The runtime ``isinstance``
735 check exists as a defensive guard for duck-typed callers.
736 """
737 from muse.core.merge_engine import crdt_join_snapshots
738
739 # A plugin that implements MuseDomainPlugin but NOT CRDTPlugin
740 # would pass static type-checking but fail at runtime.
741 # We verify the docstring is accurate by checking the stub IS a CRDTPlugin.
742 plugin = self._make_stub_plugin()
743 assert isinstance(plugin, CRDTPlugin)
744
745 def test_crdt_plugin_join_commutes(self) -> None:
746 """join(a,b) == join(b,a) at the CRDT primitive level."""
747 from muse.domain import CRDTSnapshotManifest
748
749 plugin = self._make_stub_plugin()
750 from muse.domain import CRDTPlugin
751 assert isinstance(plugin, CRDTPlugin)
752
753 a: CRDTSnapshotManifest = {
754 "files": {"a.mid": "ha"},
755 "domain": "stub",
756 "vclock": {"x": 1},
757 "crdt_state": {},
758 "schema_version": 1,
759 }
760 b: CRDTSnapshotManifest = {
761 "files": {"b.mid": "hb"},
762 "domain": "stub",
763 "vclock": {"y": 1},
764 "crdt_state": {},
765 "schema_version": 1,
766 }
767 ab = plugin.join(a, b)
768 ba = plugin.join(b, a)
769 assert set(ab["files"].keys()) == set(ba["files"].keys())
770
771
772 # ===========================================================================
773 # Cross-module: CRDT primitives satisfy lattice laws end-to-end
774 # ===========================================================================
775
776
777 class TestLatticeProperties:
778 """Property-style checks that every CRDT satisfies all three lattice laws."""
779
780 def test_vector_clock_lattice_laws(self) -> None:
781 a = VectorClock({"x": 1, "y": 2})
782 b = VectorClock({"x": 3, "z": 1})
783 c = VectorClock({"y": 5})
784
785 # Commutativity
786 assert a.merge(b).equivalent(b.merge(a))
787 # Associativity
788 assert a.merge(b).merge(c).equivalent(a.merge(b.merge(c)))
789 # Idempotency
790 assert a.merge(a).equivalent(a)
791
792 def test_g_counter_lattice_laws(self) -> None:
793 a = GCounter({"x": 1, "y": 2})
794 b = GCounter({"x": 3, "z": 1})
795 c = GCounter({"y": 5})
796
797 assert a.join(b).equivalent(b.join(a))
798 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
799 assert a.join(a).equivalent(a)
800
801 def test_lww_register_lattice_laws(self) -> None:
802 def make(v: str, ts: float, author: str) -> LWWRegister:
803 return LWWRegister(v, ts, author)
804
805 a = make("val-a", 1.0, "agent-a")
806 b = make("val-b", 2.0, "agent-b")
807 c = make("val-c", 3.0, "agent-c")
808
809 assert a.join(b).equivalent(b.join(a))
810 assert a.join(b).join(c).equivalent(a.join(b.join(c)))
811 assert a.join(a).equivalent(a)
812
813 def test_or_set_lattice_laws(self) -> None:
814 s1 = ORSet()
815 s1, _ = s1.add("X")
816 s2 = ORSet()
817 s2, _ = s2.add("Y")
818 s3 = ORSet()
819 s3, _ = s3.add("Z")
820
821 assert s1.join(s2).elements() == s2.join(s1).elements()
822 assert s1.join(s2).join(s3).elements() == s1.join(s2.join(s3)).elements()
823 assert s1.join(s1).elements() == s1.elements()
824
825 def test_aw_map_lattice_laws(self) -> None:
826 m1 = AWMap().set("A", "1")
827 m2 = AWMap().set("B", "2")
828 m3 = AWMap().set("C", "3")
829
830 assert m1.join(m2).to_plain_dict() == m2.join(m1).to_plain_dict()
831 assert m1.join(m2).join(m3).to_plain_dict() == m1.join(m2.join(m3)).to_plain_dict()
832 assert m1.join(m1).to_plain_dict() == m1.to_plain_dict()