test_crdt.py
python
| 1 | """Tests for muse.plugins.music._crdt_notes — NotePosition, RGANoteEntry, MusicRGA. |
| 2 | |
| 3 | Verifies all three CRDT laws: |
| 4 | 1. Commutativity: merge(a, b) == merge(b, a) |
| 5 | 2. Associativity: merge(merge(a, b), c) == merge(a, merge(b, c)) |
| 6 | 3. Idempotency: merge(a, a) == a |
| 7 | """ |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import pytest |
| 11 | |
| 12 | from muse.plugins.music._crdt_notes import ( |
| 13 | MusicRGA, |
| 14 | NotePosition, |
| 15 | RGANoteEntry, |
| 16 | _pitch_to_voice_lane, |
| 17 | ) |
| 18 | from muse.plugins.music.midi_diff import NoteKey |
| 19 | |
| 20 | |
| 21 | def _key(pitch: int = 60, velocity: int = 80, start_tick: int = 0, |
| 22 | duration_ticks: int = 480, channel: int = 0) -> NoteKey: |
| 23 | return NoteKey(pitch=pitch, velocity=velocity, start_tick=start_tick, |
| 24 | duration_ticks=duration_ticks, channel=channel) |
| 25 | |
| 26 | |
| 27 | # --------------------------------------------------------------------------- |
| 28 | # NotePosition ordering |
| 29 | # --------------------------------------------------------------------------- |
| 30 | |
| 31 | |
| 32 | class TestNotePosition: |
| 33 | def test_ordered_by_measure_first(self) -> None: |
| 34 | p1 = NotePosition(measure=1, beat_sub=100, voice_lane=3, op_id="zzz") |
| 35 | p2 = NotePosition(measure=2, beat_sub=0, voice_lane=0, op_id="aaa") |
| 36 | assert p1 < p2 |
| 37 | |
| 38 | def test_ordered_by_beat_sub_within_measure(self) -> None: |
| 39 | p1 = NotePosition(measure=1, beat_sub=100, voice_lane=3, op_id="zzz") |
| 40 | p2 = NotePosition(measure=1, beat_sub=200, voice_lane=0, op_id="aaa") |
| 41 | assert p1 < p2 |
| 42 | |
| 43 | def test_ordered_by_voice_lane_at_same_beat(self) -> None: |
| 44 | # Bass (lane 0) should come before soprano (lane 3). |
| 45 | p_bass = NotePosition(measure=1, beat_sub=0, voice_lane=0, op_id="zzz") |
| 46 | p_soprano = NotePosition(measure=1, beat_sub=0, voice_lane=3, op_id="aaa") |
| 47 | assert p_bass < p_soprano |
| 48 | |
| 49 | def test_tie_broken_by_op_id(self) -> None: |
| 50 | p1 = NotePosition(measure=1, beat_sub=0, voice_lane=0, op_id="aaa") |
| 51 | p2 = NotePosition(measure=1, beat_sub=0, voice_lane=0, op_id="bbb") |
| 52 | assert p1 < p2 |
| 53 | |
| 54 | |
| 55 | # --------------------------------------------------------------------------- |
| 56 | # _pitch_to_voice_lane |
| 57 | # --------------------------------------------------------------------------- |
| 58 | |
| 59 | |
| 60 | class TestPitchToVoiceLane: |
| 61 | def test_bass_range(self) -> None: |
| 62 | assert _pitch_to_voice_lane(24) == 0 |
| 63 | assert _pitch_to_voice_lane(47) == 0 |
| 64 | |
| 65 | def test_tenor_range(self) -> None: |
| 66 | assert _pitch_to_voice_lane(48) == 1 |
| 67 | assert _pitch_to_voice_lane(59) == 1 |
| 68 | |
| 69 | def test_alto_range(self) -> None: |
| 70 | assert _pitch_to_voice_lane(60) == 2 |
| 71 | assert _pitch_to_voice_lane(71) == 2 |
| 72 | |
| 73 | def test_soprano_range(self) -> None: |
| 74 | assert _pitch_to_voice_lane(72) == 3 |
| 75 | assert _pitch_to_voice_lane(108) == 3 |
| 76 | |
| 77 | |
| 78 | # --------------------------------------------------------------------------- |
| 79 | # MusicRGA — basic insert / delete |
| 80 | # --------------------------------------------------------------------------- |
| 81 | |
| 82 | |
| 83 | class TestMusicRGAInsertDelete: |
| 84 | def test_single_insert_visible(self) -> None: |
| 85 | seq = MusicRGA("agent-a") |
| 86 | seq.insert(_key(60)) |
| 87 | notes = seq.to_sequence() |
| 88 | assert len(notes) == 1 |
| 89 | assert notes[0]["pitch"] == 60 |
| 90 | |
| 91 | def test_multiple_inserts_ordered_by_position(self) -> None: |
| 92 | seq = MusicRGA("agent-a") |
| 93 | # Insert soprano (pitch 72 = lane 3) before bass (pitch 36 = lane 0) |
| 94 | # at the same beat — bass should appear first in output. |
| 95 | seq.insert(_key(pitch=72, start_tick=0)) |
| 96 | seq.insert(_key(pitch=36, start_tick=0)) |
| 97 | notes = seq.to_sequence() |
| 98 | assert notes[0]["pitch"] == 36 # bass first |
| 99 | assert notes[1]["pitch"] == 72 # soprano second |
| 100 | |
| 101 | def test_delete_removes_note(self) -> None: |
| 102 | seq = MusicRGA("agent-a") |
| 103 | entry = seq.insert(_key(60)) |
| 104 | seq.delete(entry["op_id"]) |
| 105 | assert seq.to_sequence() == [] |
| 106 | |
| 107 | def test_delete_nonexistent_raises(self) -> None: |
| 108 | seq = MusicRGA("agent-a") |
| 109 | with pytest.raises(KeyError): |
| 110 | seq.delete("nonexistent-op-id") |
| 111 | |
| 112 | def test_tombstoned_entries_counted(self) -> None: |
| 113 | seq = MusicRGA("agent-a") |
| 114 | e = seq.insert(_key(60)) |
| 115 | seq.delete(e["op_id"]) |
| 116 | assert seq.entry_count() == 1 |
| 117 | assert seq.live_count() == 0 |
| 118 | |
| 119 | |
| 120 | # --------------------------------------------------------------------------- |
| 121 | # CRDT merge — commutativity, associativity, idempotency |
| 122 | # --------------------------------------------------------------------------- |
| 123 | |
| 124 | |
| 125 | class TestMusicRGACRDTLaws: |
| 126 | def _make_replicas(self) -> tuple[MusicRGA, MusicRGA, MusicRGA]: |
| 127 | a = MusicRGA("agent-a") |
| 128 | b = MusicRGA("agent-b") |
| 129 | c = MusicRGA("agent-c") |
| 130 | |
| 131 | a.insert(_key(60, start_tick=0)) |
| 132 | a.insert(_key(64, start_tick=480)) |
| 133 | |
| 134 | b.insert(_key(67, start_tick=0)) |
| 135 | b.insert(_key(71, start_tick=480)) |
| 136 | |
| 137 | c.insert(_key(72, start_tick=0)) |
| 138 | |
| 139 | # Propagate a's ops to b and c (simulating gossip). |
| 140 | for entry in list(a._entries.values()): |
| 141 | b._entries.setdefault(entry["op_id"], entry) |
| 142 | c._entries.setdefault(entry["op_id"], entry) |
| 143 | |
| 144 | return a, b, c |
| 145 | |
| 146 | def test_commutativity(self) -> None: |
| 147 | a, b, _ = self._make_replicas() |
| 148 | ab = MusicRGA.merge(a, b) |
| 149 | ba = MusicRGA.merge(b, a) |
| 150 | assert ab.to_sequence() == ba.to_sequence() |
| 151 | |
| 152 | def test_associativity(self) -> None: |
| 153 | a, b, c = self._make_replicas() |
| 154 | ab_c = MusicRGA.merge(MusicRGA.merge(a, b), c) |
| 155 | a_bc = MusicRGA.merge(a, MusicRGA.merge(b, c)) |
| 156 | assert ab_c.to_sequence() == a_bc.to_sequence() |
| 157 | |
| 158 | def test_idempotency(self) -> None: |
| 159 | a, _, _ = self._make_replicas() |
| 160 | aa = MusicRGA.merge(a, a) |
| 161 | assert aa.to_sequence() == a.to_sequence() |
| 162 | |
| 163 | def test_merge_contains_all_inserts(self) -> None: |
| 164 | a = MusicRGA("agent-a") |
| 165 | b = MusicRGA("agent-b") |
| 166 | for i in range(5): |
| 167 | a.insert(_key(60 + i, start_tick=i * 480)) |
| 168 | for i in range(5): |
| 169 | b.insert(_key(72 + i, start_tick=i * 480)) |
| 170 | merged = MusicRGA.merge(a, b) |
| 171 | assert merged.live_count() == 10 |
| 172 | |
| 173 | def test_tombstone_wins_in_merge(self) -> None: |
| 174 | a = MusicRGA("agent-a") |
| 175 | b = MusicRGA("agent-b") |
| 176 | |
| 177 | entry = a.insert(_key(60)) |
| 178 | # Share the insert with b. |
| 179 | b._entries[entry["op_id"]] = entry |
| 180 | # b deletes the shared note; a does not. |
| 181 | b.delete(entry["op_id"]) |
| 182 | |
| 183 | merged = MusicRGA.merge(a, b) |
| 184 | # Tombstone wins — note should be absent in merged result. |
| 185 | assert merged.live_count() == 0 |
| 186 | |
| 187 | |
| 188 | # --------------------------------------------------------------------------- |
| 189 | # to_domain_ops |
| 190 | # --------------------------------------------------------------------------- |
| 191 | |
| 192 | |
| 193 | class TestToDomainOps: |
| 194 | def test_empty_base_and_live_produces_no_ops(self) -> None: |
| 195 | seq = MusicRGA("agent-a") |
| 196 | ops = seq.to_domain_ops([]) |
| 197 | assert ops == [] |
| 198 | |
| 199 | def test_added_notes_produce_insert_ops(self) -> None: |
| 200 | seq = MusicRGA("agent-a") |
| 201 | seq.insert(_key(60)) |
| 202 | seq.insert(_key(64)) |
| 203 | ops = seq.to_domain_ops([]) |
| 204 | assert len(ops) == 2 |
| 205 | assert all(o["op"] == "insert" for o in ops) |
| 206 | |
| 207 | def test_removed_notes_produce_delete_ops(self) -> None: |
| 208 | base_notes = [_key(60), _key(64)] |
| 209 | seq = MusicRGA("agent-a") |
| 210 | # Add only the first note back. |
| 211 | seq.insert(_key(60)) |
| 212 | ops = seq.to_domain_ops(base_notes) |
| 213 | op_types = [o["op"] for o in ops] |
| 214 | assert "delete" in op_types |
| 215 | |
| 216 | def test_unchanged_notes_produce_no_ops(self) -> None: |
| 217 | note = _key(60) |
| 218 | seq = MusicRGA("agent-a") |
| 219 | seq.insert(note) |
| 220 | ops = seq.to_domain_ops([note]) |
| 221 | assert ops == [] |
| 222 | |
| 223 | def test_voice_ordering_preserved_in_sequence(self) -> None: |
| 224 | seq = MusicRGA("agent-a") |
| 225 | # Insert in reverse voice order; output should be ordered bass→soprano. |
| 226 | seq.insert(_key(pitch=84, start_tick=0)) # soprano |
| 227 | seq.insert(_key(pitch=60, start_tick=0)) # alto |
| 228 | seq.insert(_key(pitch=48, start_tick=0)) # tenor |
| 229 | seq.insert(_key(pitch=36, start_tick=0)) # bass |
| 230 | notes = seq.to_sequence() |
| 231 | pitches = [n["pitch"] for n in notes] |
| 232 | assert pitches == sorted(pitches) # bass < tenor < alto < soprano |