test_music_midi_merge.py
python
| 1 | """Tests for muse/plugins/midi/midi_merge.py — 21-dimension MIDI merge.""" |
| 2 | |
| 3 | import io |
| 4 | |
| 5 | import mido |
| 6 | import pytest |
| 7 | |
| 8 | from muse.core.attributes import AttributeRule |
| 9 | from muse.plugins.midi.midi_merge import ( |
| 10 | INTERNAL_DIMS, |
| 11 | DIM_ALIAS, |
| 12 | DimensionSlice, |
| 13 | MidiDimensions, |
| 14 | NON_INDEPENDENT_DIMS, |
| 15 | _classify_event, |
| 16 | _hash_events, |
| 17 | dimension_conflict_detail, |
| 18 | extract_dimensions, |
| 19 | merge_midi_dimensions, |
| 20 | ) |
| 21 | |
| 22 | |
| 23 | # --------------------------------------------------------------------------- |
| 24 | # MIDI builder helpers |
| 25 | # --------------------------------------------------------------------------- |
| 26 | |
| 27 | |
| 28 | def _make_midi( |
| 29 | *, |
| 30 | notes: list[tuple[int, int, int]] | None = None, |
| 31 | pitchwheel: list[tuple[int, int]] | None = None, |
| 32 | control_change: list[tuple[int, int, int]] | None = None, |
| 33 | channel_pressure: list[tuple[int, int]] | None = None, |
| 34 | poly_aftertouch: list[tuple[int, int, int]] | None = None, |
| 35 | program_change: list[tuple[int, int]] | None = None, |
| 36 | tempo: int = 500_000, |
| 37 | ticks_per_beat: int = 480, |
| 38 | ) -> bytes: |
| 39 | """Build a minimal type-0 MIDI file in memory. |
| 40 | |
| 41 | Args: |
| 42 | notes: List of (abs_tick, note, velocity) note-on events. |
| 43 | pitchwheel: List of (abs_tick, pitch) pitchwheel events. |
| 44 | control_change: List of (abs_tick, control, value) CC events. |
| 45 | channel_pressure: List of (abs_tick, pressure) channel pressure events. |
| 46 | poly_aftertouch: List of (abs_tick, note, pressure) poly-pressure events. |
| 47 | program_change: List of (abs_tick, program) program-change events. |
| 48 | tempo: Microseconds per beat (default 120 BPM). |
| 49 | ticks_per_beat: MIDI resolution. |
| 50 | """ |
| 51 | mid = mido.MidiFile(type=0, ticks_per_beat=ticks_per_beat) |
| 52 | track = mido.MidiTrack() |
| 53 | |
| 54 | events: list[tuple[int, mido.Message]] = [] |
| 55 | events.append((0, mido.MetaMessage("set_tempo", tempo=tempo, time=0))) |
| 56 | |
| 57 | for abs_tick, note, vel in notes or []: |
| 58 | events.append((abs_tick, mido.Message("note_on", note=note, velocity=vel, time=0))) |
| 59 | events.append((abs_tick + 120, mido.Message("note_off", note=note, velocity=0, time=0))) |
| 60 | |
| 61 | for abs_tick, pitch in pitchwheel or []: |
| 62 | events.append((abs_tick, mido.Message("pitchwheel", pitch=pitch, time=0))) |
| 63 | |
| 64 | for abs_tick, ctrl, val in control_change or []: |
| 65 | events.append((abs_tick, mido.Message("control_change", control=ctrl, value=val, time=0))) |
| 66 | |
| 67 | for abs_tick, pressure in channel_pressure or []: |
| 68 | events.append((abs_tick, mido.Message("aftertouch", value=pressure, time=0))) |
| 69 | |
| 70 | for abs_tick, note, pressure in poly_aftertouch or []: |
| 71 | events.append((abs_tick, mido.Message("polytouch", note=note, value=pressure, time=0))) |
| 72 | |
| 73 | for abs_tick, program in program_change or []: |
| 74 | events.append((abs_tick, mido.Message("program_change", program=program, time=0))) |
| 75 | |
| 76 | events.sort(key=lambda x: (x[0], x[1].type)) |
| 77 | prev = 0 |
| 78 | for abs_tick, msg in events: |
| 79 | delta = abs_tick - prev |
| 80 | track.append(msg.copy(time=delta)) |
| 81 | prev = abs_tick |
| 82 | |
| 83 | track.append(mido.MetaMessage("end_of_track", time=0)) |
| 84 | mid.tracks.append(track) |
| 85 | |
| 86 | buf = io.BytesIO() |
| 87 | mid.save(file=buf) |
| 88 | return buf.getvalue() |
| 89 | |
| 90 | |
| 91 | def _midi_bytes_to_notes(midi_bytes: bytes) -> set[int]: |
| 92 | mid = mido.MidiFile(file=io.BytesIO(midi_bytes)) |
| 93 | notes: set[int] = set() |
| 94 | for track in mid.tracks: |
| 95 | for msg in track: |
| 96 | if msg.type == "note_on" and msg.velocity > 0: |
| 97 | notes.add(msg.note) |
| 98 | return notes |
| 99 | |
| 100 | |
| 101 | def _midi_bytes_to_pitchwheels(midi_bytes: bytes) -> list[int]: |
| 102 | mid = mido.MidiFile(file=io.BytesIO(midi_bytes)) |
| 103 | values: list[int] = [] |
| 104 | for track in mid.tracks: |
| 105 | for msg in track: |
| 106 | if msg.type == "pitchwheel": |
| 107 | values.append(msg.pitch) |
| 108 | return values |
| 109 | |
| 110 | |
| 111 | def _midi_bytes_to_ccs(midi_bytes: bytes) -> list[tuple[int, int]]: |
| 112 | mid = mido.MidiFile(file=io.BytesIO(midi_bytes)) |
| 113 | ccs: list[tuple[int, int]] = [] |
| 114 | for track in mid.tracks: |
| 115 | for msg in track: |
| 116 | if msg.type == "control_change": |
| 117 | ccs.append((msg.control, msg.value)) |
| 118 | return ccs |
| 119 | |
| 120 | |
| 121 | # --------------------------------------------------------------------------- |
| 122 | # INTERNAL_DIMS — verify all 21 dimensions declared |
| 123 | # --------------------------------------------------------------------------- |
| 124 | |
| 125 | |
| 126 | class TestInternalDims: |
| 127 | _EXPECTED_21 = [ |
| 128 | "notes", "pitch_bend", "channel_pressure", "poly_pressure", |
| 129 | "cc_modulation", "cc_volume", "cc_pan", "cc_expression", |
| 130 | "cc_sustain", "cc_portamento", "cc_sostenuto", "cc_soft_pedal", |
| 131 | "cc_reverb", "cc_chorus", "cc_other", |
| 132 | "program_change", "tempo_map", "time_signatures", |
| 133 | "key_signatures", "markers", "track_structure", |
| 134 | ] |
| 135 | |
| 136 | def test_exactly_21_dims(self) -> None: |
| 137 | assert len(INTERNAL_DIMS) == 21 |
| 138 | |
| 139 | def test_all_expected_names_present(self) -> None: |
| 140 | assert set(INTERNAL_DIMS) == set(self._EXPECTED_21) |
| 141 | |
| 142 | def test_non_independent_dims(self) -> None: |
| 143 | assert NON_INDEPENDENT_DIMS == frozenset({"tempo_map", "time_signatures", "track_structure"}) |
| 144 | |
| 145 | def test_no_old_coarse_names_in_dims(self) -> None: |
| 146 | """Old coarse names (melodic, rhythmic, harmonic, dynamic, structural) must be gone.""" |
| 147 | old_names = {"melodic", "rhythmic", "harmonic", "dynamic", "structural"} |
| 148 | assert old_names.isdisjoint(set(INTERNAL_DIMS)) |
| 149 | |
| 150 | def test_no_old_coarse_aliases_in_dim_alias(self) -> None: |
| 151 | """Old aliases removed from DIM_ALIAS — no backward-compat shims.""" |
| 152 | old_aliases = {"melodic", "rhythmic", "harmonic", "dynamic", "structural"} |
| 153 | assert old_aliases.isdisjoint(set(DIM_ALIAS)) |
| 154 | |
| 155 | |
| 156 | # --------------------------------------------------------------------------- |
| 157 | # _classify_event — fine-grained 21-dimension routing |
| 158 | # --------------------------------------------------------------------------- |
| 159 | |
| 160 | |
| 161 | class TestClassifyEvent: |
| 162 | # Note events → notes |
| 163 | def test_note_on(self) -> None: |
| 164 | assert _classify_event(mido.Message("note_on", note=60)) == "notes" |
| 165 | |
| 166 | def test_note_off(self) -> None: |
| 167 | assert _classify_event(mido.Message("note_off", note=60)) == "notes" |
| 168 | |
| 169 | # Pitch bend → pitch_bend |
| 170 | def test_pitchwheel(self) -> None: |
| 171 | assert _classify_event(mido.Message("pitchwheel", pitch=100)) == "pitch_bend" |
| 172 | |
| 173 | # Channel pressure → channel_pressure |
| 174 | def test_channel_aftertouch(self) -> None: |
| 175 | assert _classify_event(mido.Message("aftertouch", value=64)) == "channel_pressure" |
| 176 | |
| 177 | # Polyphonic aftertouch → poly_pressure |
| 178 | def test_poly_aftertouch(self) -> None: |
| 179 | assert _classify_event(mido.Message("polytouch", note=60, value=64)) == "poly_pressure" |
| 180 | |
| 181 | # Named CC controllers |
| 182 | def test_cc_1_modulation(self) -> None: |
| 183 | assert _classify_event(mido.Message("control_change", control=1, value=64)) == "cc_modulation" |
| 184 | |
| 185 | def test_cc_7_volume(self) -> None: |
| 186 | assert _classify_event(mido.Message("control_change", control=7, value=100)) == "cc_volume" |
| 187 | |
| 188 | def test_cc_10_pan(self) -> None: |
| 189 | assert _classify_event(mido.Message("control_change", control=10, value=64)) == "cc_pan" |
| 190 | |
| 191 | def test_cc_11_expression(self) -> None: |
| 192 | assert _classify_event(mido.Message("control_change", control=11, value=100)) == "cc_expression" |
| 193 | |
| 194 | def test_cc_64_sustain(self) -> None: |
| 195 | assert _classify_event(mido.Message("control_change", control=64, value=127)) == "cc_sustain" |
| 196 | |
| 197 | def test_cc_65_portamento(self) -> None: |
| 198 | assert _classify_event(mido.Message("control_change", control=65, value=0)) == "cc_portamento" |
| 199 | |
| 200 | def test_cc_66_sostenuto(self) -> None: |
| 201 | assert _classify_event(mido.Message("control_change", control=66, value=127)) == "cc_sostenuto" |
| 202 | |
| 203 | def test_cc_67_soft_pedal(self) -> None: |
| 204 | assert _classify_event(mido.Message("control_change", control=67, value=64)) == "cc_soft_pedal" |
| 205 | |
| 206 | def test_cc_91_reverb(self) -> None: |
| 207 | assert _classify_event(mido.Message("control_change", control=91, value=40)) == "cc_reverb" |
| 208 | |
| 209 | def test_cc_93_chorus(self) -> None: |
| 210 | assert _classify_event(mido.Message("control_change", control=93, value=20)) == "cc_chorus" |
| 211 | |
| 212 | def test_cc_other_unlisted(self) -> None: |
| 213 | # CC 2 is not individually named → cc_other |
| 214 | assert _classify_event(mido.Message("control_change", control=2, value=50)) == "cc_other" |
| 215 | |
| 216 | def test_cc_3_other(self) -> None: |
| 217 | assert _classify_event(mido.Message("control_change", control=3, value=50)) == "cc_other" |
| 218 | |
| 219 | # Program change |
| 220 | def test_program_change(self) -> None: |
| 221 | assert _classify_event(mido.Message("program_change", program=40)) == "program_change" |
| 222 | |
| 223 | # Tempo / time-sig → non-independent |
| 224 | def test_set_tempo(self) -> None: |
| 225 | assert _classify_event(mido.MetaMessage("set_tempo", tempo=500_000)) == "tempo_map" |
| 226 | |
| 227 | def test_time_signature(self) -> None: |
| 228 | msg = mido.MetaMessage( |
| 229 | "time_signature", numerator=4, denominator=4, |
| 230 | clocks_per_click=24, notated_32nd_notes_per_beat=8, |
| 231 | ) |
| 232 | assert _classify_event(msg) == "time_signatures" |
| 233 | |
| 234 | # Key signature |
| 235 | def test_key_signature(self) -> None: |
| 236 | assert _classify_event(mido.MetaMessage("key_signature", key="C")) == "key_signatures" |
| 237 | |
| 238 | # Markers |
| 239 | def test_marker(self) -> None: |
| 240 | assert _classify_event(mido.MetaMessage("marker", text="verse")) == "markers" |
| 241 | |
| 242 | def test_text(self) -> None: |
| 243 | assert _classify_event(mido.MetaMessage("text", text="hello")) == "markers" |
| 244 | |
| 245 | # Track structure |
| 246 | def test_track_name(self) -> None: |
| 247 | assert _classify_event(mido.MetaMessage("track_name", name="Piano")) == "track_structure" |
| 248 | |
| 249 | def test_end_of_track_returns_none(self) -> None: |
| 250 | # end_of_track is reconstructed during MIDI assembly, not stored in any dim |
| 251 | assert _classify_event(mido.MetaMessage("end_of_track")) is None |
| 252 | |
| 253 | |
| 254 | # --------------------------------------------------------------------------- |
| 255 | # extract_dimensions |
| 256 | # --------------------------------------------------------------------------- |
| 257 | |
| 258 | |
| 259 | class TestExtractDimensions: |
| 260 | def test_empty_midi_has_all_21_dims(self) -> None: |
| 261 | midi = _make_midi() |
| 262 | dims = extract_dimensions(midi) |
| 263 | assert set(dims.slices.keys()) == set(INTERNAL_DIMS) |
| 264 | |
| 265 | def test_notes_in_notes_bucket(self) -> None: |
| 266 | midi = _make_midi(notes=[(0, 60, 80), (480, 64, 80)]) |
| 267 | dims = extract_dimensions(midi) |
| 268 | note_on = [msg for _, msg in dims.slices["notes"].events if msg.type == "note_on"] |
| 269 | assert len(note_on) == 2 |
| 270 | |
| 271 | def test_pitchwheel_in_pitch_bend(self) -> None: |
| 272 | midi = _make_midi(pitchwheel=[(100, 500), (200, -500)]) |
| 273 | dims = extract_dimensions(midi) |
| 274 | assert len(dims.slices["pitch_bend"].events) == 2 |
| 275 | |
| 276 | def test_cc_volume_bucket(self) -> None: |
| 277 | midi = _make_midi(control_change=[(0, 7, 100)]) |
| 278 | dims = extract_dimensions(midi) |
| 279 | assert len(dims.slices["cc_volume"].events) == 1 |
| 280 | |
| 281 | def test_cc_sustain_bucket(self) -> None: |
| 282 | midi = _make_midi(control_change=[(0, 64, 127)]) |
| 283 | dims = extract_dimensions(midi) |
| 284 | assert len(dims.slices["cc_sustain"].events) == 1 |
| 285 | |
| 286 | def test_cc_modulation_bucket(self) -> None: |
| 287 | midi = _make_midi(control_change=[(0, 1, 90)]) |
| 288 | dims = extract_dimensions(midi) |
| 289 | assert len(dims.slices["cc_modulation"].events) == 1 |
| 290 | |
| 291 | def test_cc_other_bucket(self) -> None: |
| 292 | midi = _make_midi(control_change=[(0, 2, 50)]) |
| 293 | dims = extract_dimensions(midi) |
| 294 | assert len(dims.slices["cc_other"].events) == 1 |
| 295 | |
| 296 | def test_tempo_in_tempo_map(self) -> None: |
| 297 | midi = _make_midi(tempo=600_000) |
| 298 | dims = extract_dimensions(midi) |
| 299 | types = {msg.type for _, msg in dims.slices["tempo_map"].events} |
| 300 | assert "set_tempo" in types |
| 301 | |
| 302 | def test_content_hash_is_deterministic(self) -> None: |
| 303 | midi = _make_midi(notes=[(0, 60, 80)]) |
| 304 | d1 = extract_dimensions(midi) |
| 305 | d2 = extract_dimensions(midi) |
| 306 | assert d1.slices["notes"].content_hash == d2.slices["notes"].content_hash |
| 307 | |
| 308 | def test_different_notes_give_different_hash(self) -> None: |
| 309 | da = extract_dimensions(_make_midi(notes=[(0, 60, 80)])) |
| 310 | db = extract_dimensions(_make_midi(notes=[(0, 62, 80)])) |
| 311 | assert da.slices["notes"].content_hash != db.slices["notes"].content_hash |
| 312 | |
| 313 | def test_different_dimensions_independent_hashes(self) -> None: |
| 314 | """Changing notes must not affect pitch_bend hash.""" |
| 315 | base = _make_midi(pitchwheel=[(0, 200)]) |
| 316 | with_notes = _make_midi(notes=[(0, 60, 80)], pitchwheel=[(0, 200)]) |
| 317 | da = extract_dimensions(base) |
| 318 | db = extract_dimensions(with_notes) |
| 319 | assert da.slices["pitch_bend"].content_hash == db.slices["pitch_bend"].content_hash |
| 320 | assert da.slices["notes"].content_hash != db.slices["notes"].content_hash |
| 321 | |
| 322 | def test_ticks_per_beat_preserved(self) -> None: |
| 323 | midi = _make_midi(ticks_per_beat=960) |
| 324 | assert extract_dimensions(midi).ticks_per_beat == 960 |
| 325 | |
| 326 | def test_invalid_bytes_raises(self) -> None: |
| 327 | with pytest.raises(ValueError, match="Failed to parse"): |
| 328 | extract_dimensions(b"not a midi file") |
| 329 | |
| 330 | def test_get_by_fine_alias(self) -> None: |
| 331 | midi = _make_midi(pitchwheel=[(0, 100)]) |
| 332 | dims = extract_dimensions(midi) |
| 333 | assert dims.get("pitch_bend").name == "pitch_bend" |
| 334 | assert dims.get("sustain").name == "cc_sustain" |
| 335 | assert dims.get("volume").name == "cc_volume" |
| 336 | |
| 337 | def test_get_unknown_alias_raises(self) -> None: |
| 338 | midi = _make_midi() |
| 339 | dims = extract_dimensions(midi) |
| 340 | with pytest.raises(KeyError): |
| 341 | dims.get("melodic") # old alias — removed |
| 342 | |
| 343 | |
| 344 | # --------------------------------------------------------------------------- |
| 345 | # dimension_conflict_detail |
| 346 | # --------------------------------------------------------------------------- |
| 347 | |
| 348 | |
| 349 | class TestDimensionConflictDetail: |
| 350 | def _dims_from( |
| 351 | self, |
| 352 | notes: list[tuple[int, int, int]] | None = None, |
| 353 | pitchwheel: list[tuple[int, int]] | None = None, |
| 354 | control_change: list[tuple[int, int, int]] | None = None, |
| 355 | tempo: int = 500_000, |
| 356 | ) -> MidiDimensions: |
| 357 | return extract_dimensions(_make_midi( |
| 358 | notes=notes, pitchwheel=pitchwheel, |
| 359 | control_change=control_change, tempo=tempo, |
| 360 | )) |
| 361 | |
| 362 | def test_unchanged_when_all_same(self) -> None: |
| 363 | base = self._dims_from(notes=[(0, 60, 80)]) |
| 364 | detail = dimension_conflict_detail(base, base, base) |
| 365 | assert all(v == "unchanged" for v in detail.values()) |
| 366 | |
| 367 | def test_notes_left_only(self) -> None: |
| 368 | base = self._dims_from() |
| 369 | left = self._dims_from(notes=[(0, 60, 80)]) |
| 370 | detail = dimension_conflict_detail(base, left, base) |
| 371 | assert detail["notes"] == "left_only" |
| 372 | assert detail["pitch_bend"] == "unchanged" |
| 373 | |
| 374 | def test_pitch_bend_right_only(self) -> None: |
| 375 | base = self._dims_from() |
| 376 | right = self._dims_from(pitchwheel=[(0, 100)]) |
| 377 | detail = dimension_conflict_detail(base, base, right) |
| 378 | assert detail["pitch_bend"] == "right_only" |
| 379 | |
| 380 | def test_both_sides_change_notes(self) -> None: |
| 381 | base = self._dims_from() |
| 382 | left = self._dims_from(notes=[(0, 60, 80)]) |
| 383 | right = self._dims_from(notes=[(0, 64, 80)]) |
| 384 | detail = dimension_conflict_detail(base, left, right) |
| 385 | assert detail["notes"] == "both" |
| 386 | |
| 387 | def test_independent_changes_in_separate_dims(self) -> None: |
| 388 | base = self._dims_from() |
| 389 | left = self._dims_from(notes=[(0, 60, 80)]) |
| 390 | right = self._dims_from(pitchwheel=[(0, 200)]) |
| 391 | detail = dimension_conflict_detail(base, left, right) |
| 392 | assert detail["notes"] == "left_only" |
| 393 | assert detail["pitch_bend"] == "right_only" |
| 394 | assert detail["cc_volume"] == "unchanged" |
| 395 | |
| 396 | def test_cc_volume_vs_cc_sustain_independent(self) -> None: |
| 397 | """Two different CC dims changed independently.""" |
| 398 | base = self._dims_from() |
| 399 | left = self._dims_from(control_change=[(0, 7, 100)]) # cc_volume |
| 400 | right = self._dims_from(control_change=[(0, 64, 127)]) # cc_sustain |
| 401 | detail = dimension_conflict_detail(base, left, right) |
| 402 | assert detail["cc_volume"] == "left_only" |
| 403 | assert detail["cc_sustain"] == "right_only" |
| 404 | |
| 405 | |
| 406 | # --------------------------------------------------------------------------- |
| 407 | # merge_midi_dimensions |
| 408 | # --------------------------------------------------------------------------- |
| 409 | |
| 410 | |
| 411 | class TestMergeMidiDimensions: |
| 412 | def _midi( |
| 413 | self, |
| 414 | notes: list[tuple[int, int, int]] | None = None, |
| 415 | pitchwheel: list[tuple[int, int]] | None = None, |
| 416 | control_change: list[tuple[int, int, int]] | None = None, |
| 417 | tempo: int = 500_000, |
| 418 | ticks_per_beat: int = 480, |
| 419 | ) -> bytes: |
| 420 | return _make_midi( |
| 421 | notes=notes, pitchwheel=pitchwheel, control_change=control_change, |
| 422 | tempo=tempo, ticks_per_beat=ticks_per_beat, |
| 423 | ) |
| 424 | |
| 425 | def _rules(self, *rules: tuple[str, str, str]) -> list[AttributeRule]: |
| 426 | return [AttributeRule(p, d, s, i + 1) for i, (p, d, s) in enumerate(rules)] |
| 427 | |
| 428 | # --- Clean auto-merge: independent dimensions --------------------------- |
| 429 | |
| 430 | def test_independent_notes_and_pitch_bend(self) -> None: |
| 431 | """Left changed notes, right changed pitch_bend → clean auto-merge.""" |
| 432 | base = self._midi() |
| 433 | left = self._midi(notes=[(0, 60, 80)]) |
| 434 | right = self._midi(pitchwheel=[(0, 500)]) |
| 435 | result = merge_midi_dimensions(base, left, right, [], "song.mid") |
| 436 | assert result is not None |
| 437 | merged, _ = result |
| 438 | assert _midi_bytes_to_notes(merged) == {60} |
| 439 | assert _midi_bytes_to_pitchwheels(merged) == [500] |
| 440 | |
| 441 | def test_independent_two_cc_dims(self) -> None: |
| 442 | """Left changed cc_volume, right changed cc_sustain → clean auto-merge.""" |
| 443 | base = self._midi() |
| 444 | left = self._midi(control_change=[(0, 7, 100)]) # cc_volume |
| 445 | right = self._midi(control_change=[(0, 64, 127)]) # cc_sustain |
| 446 | result = merge_midi_dimensions(base, left, right, [], "song.mid") |
| 447 | assert result is not None |
| 448 | merged, _ = result |
| 449 | ccs = dict(_midi_bytes_to_ccs(merged)) |
| 450 | assert ccs.get(7) == 100 |
| 451 | assert ccs.get(64) == 127 |
| 452 | |
| 453 | def test_one_side_only_changed_notes(self) -> None: |
| 454 | base = self._midi() |
| 455 | left = self._midi(notes=[(0, 64, 80)]) |
| 456 | result = merge_midi_dimensions(base, left, self._midi(), [], "song.mid") |
| 457 | assert result is not None |
| 458 | merged, _ = result |
| 459 | assert _midi_bytes_to_notes(merged) == {64} |
| 460 | |
| 461 | def test_unchanged_both_sides_preserved(self) -> None: |
| 462 | base = self._midi(notes=[(0, 60, 80)]) |
| 463 | result = merge_midi_dimensions(base, base, base, [], "song.mid") |
| 464 | assert result is not None |
| 465 | merged, _ = result |
| 466 | assert _midi_bytes_to_notes(merged) == {60} |
| 467 | |
| 468 | # --- Strategy override via AttributeRule -------------------------------- |
| 469 | |
| 470 | def test_notes_conflict_resolved_by_ours_rule(self) -> None: |
| 471 | base = self._midi() |
| 472 | left = self._midi(notes=[(0, 60, 80)]) |
| 473 | right = self._midi(notes=[(0, 64, 80)]) |
| 474 | rules = self._rules(("*", "notes", "ours")) |
| 475 | result = merge_midi_dimensions(base, left, right, rules, "song.mid") |
| 476 | assert result is not None |
| 477 | merged, report = result |
| 478 | assert _midi_bytes_to_notes(merged) == {60} |
| 479 | assert "ours" in report["notes"] |
| 480 | |
| 481 | def test_notes_conflict_resolved_by_theirs_rule(self) -> None: |
| 482 | base = self._midi() |
| 483 | left = self._midi(notes=[(0, 60, 80)]) |
| 484 | right = self._midi(notes=[(0, 64, 80)]) |
| 485 | rules = self._rules(("*", "notes", "theirs")) |
| 486 | result = merge_midi_dimensions(base, left, right, rules, "song.mid") |
| 487 | assert result is not None |
| 488 | merged, _ = result |
| 489 | assert _midi_bytes_to_notes(merged) == {64} |
| 490 | |
| 491 | def test_pitch_bend_conflict_resolved_by_theirs(self) -> None: |
| 492 | base = self._midi() |
| 493 | left = self._midi(pitchwheel=[(0, 200)]) |
| 494 | right = self._midi(pitchwheel=[(0, -200)]) |
| 495 | rules = self._rules(("*", "pitch_bend", "theirs")) |
| 496 | result = merge_midi_dimensions(base, left, right, rules, "song.mid") |
| 497 | assert result is not None |
| 498 | merged, _ = result |
| 499 | assert _midi_bytes_to_pitchwheels(merged) == [-200] |
| 500 | |
| 501 | def test_wildcard_dim_rule_resolves_all(self) -> None: |
| 502 | base = self._midi() |
| 503 | left = self._midi(notes=[(0, 60, 80)], pitchwheel=[(0, 200)]) |
| 504 | right = self._midi(notes=[(0, 64, 80)], pitchwheel=[(0, -200)]) |
| 505 | rules = self._rules(("*", "*", "ours")) |
| 506 | result = merge_midi_dimensions(base, left, right, rules, "song.mid") |
| 507 | assert result is not None |
| 508 | merged, _ = result |
| 509 | assert _midi_bytes_to_notes(merged) == {60} |
| 510 | assert 200 in _midi_bytes_to_pitchwheels(merged) |
| 511 | |
| 512 | def test_notes_conflict_no_rule_returns_none(self) -> None: |
| 513 | """Both sides changed notes, no matching rule → conflict → None.""" |
| 514 | base = self._midi() |
| 515 | left = self._midi(notes=[(0, 60, 80)]) |
| 516 | right = self._midi(notes=[(0, 64, 80)]) |
| 517 | assert merge_midi_dimensions(base, left, right, [], "song.mid") is None |
| 518 | |
| 519 | def test_manual_strategy_returns_none(self) -> None: |
| 520 | base = self._midi() |
| 521 | left = self._midi(notes=[(0, 60, 80)]) |
| 522 | right = self._midi(notes=[(0, 64, 80)]) |
| 523 | rules = self._rules(("*", "notes", "manual")) |
| 524 | assert merge_midi_dimensions(base, left, right, rules, "song.mid") is None |
| 525 | |
| 526 | # --- Report content ----------------------------------------------------- |
| 527 | |
| 528 | def test_report_shows_left_right_labels(self) -> None: |
| 529 | base = self._midi() |
| 530 | left = self._midi(notes=[(0, 60, 80)]) |
| 531 | right = self._midi(pitchwheel=[(0, 100)]) |
| 532 | result = merge_midi_dimensions(base, left, right, [], "song.mid") |
| 533 | assert result is not None |
| 534 | _, report = result |
| 535 | assert report["notes"] == "left" |
| 536 | assert report["pitch_bend"] == "right" |
| 537 | |
| 538 | # --- Output is valid MIDI ----------------------------------------------- |
| 539 | |
| 540 | def test_merged_bytes_parseable(self) -> None: |
| 541 | base = self._midi() |
| 542 | left = self._midi(notes=[(0, 60, 80)]) |
| 543 | right = self._midi(pitchwheel=[(0, 100)]) |
| 544 | result = merge_midi_dimensions(base, left, right, [], "song.mid") |
| 545 | assert result is not None |
| 546 | merged, _ = result |
| 547 | parsed = mido.MidiFile(file=io.BytesIO(merged)) |
| 548 | assert parsed.ticks_per_beat == 480 |
| 549 | |
| 550 | def test_merged_bytes_preserve_ticks_per_beat(self) -> None: |
| 551 | base = _make_midi(ticks_per_beat=960) |
| 552 | left = _make_midi(notes=[(0, 60, 80)], ticks_per_beat=960) |
| 553 | right = _make_midi(pitchwheel=[(0, 100)], ticks_per_beat=960) |
| 554 | result = merge_midi_dimensions(base, left, right, [], "song.mid") |
| 555 | assert result is not None |
| 556 | merged, _ = result |
| 557 | assert mido.MidiFile(file=io.BytesIO(merged)).ticks_per_beat == 960 |
| 558 | |
| 559 | # --- Path-pattern matching in rules ------------------------------------ |
| 560 | |
| 561 | def test_path_specific_rule_respected(self) -> None: |
| 562 | base = self._midi() |
| 563 | left = self._midi(pitchwheel=[(0, 200)]) |
| 564 | right = self._midi(pitchwheel=[(0, -200)]) |
| 565 | rules = self._rules(("keys/*", "pitch_bend", "theirs")) |
| 566 | |
| 567 | result_keys = merge_midi_dimensions(base, left, right, rules, "keys/piano.mid") |
| 568 | assert result_keys is not None |
| 569 | merged_keys, _ = result_keys |
| 570 | assert _midi_bytes_to_pitchwheels(merged_keys) == [-200] |
| 571 | |
| 572 | result_other = merge_midi_dimensions(base, left, right, rules, "other/bass.mid") |
| 573 | assert result_other is None # rule doesn't match this path |
| 574 | |
| 575 | def test_multi_rule_priority_order(self) -> None: |
| 576 | """Lower-priority rule does not override higher-priority one.""" |
| 577 | base = self._midi() |
| 578 | left = self._midi(control_change=[(0, 7, 100)]) # cc_volume |
| 579 | right = self._midi(control_change=[(0, 7, 50)]) |
| 580 | rules = self._rules( |
| 581 | ("*", "cc_volume", "ours"), # priority 1 |
| 582 | ("*", "cc_volume", "theirs"), # priority 2 — should be ignored |
| 583 | ) |
| 584 | result = merge_midi_dimensions(base, left, right, rules, "song.mid") |
| 585 | assert result is not None |
| 586 | merged, _ = result |
| 587 | ccs = dict(_midi_bytes_to_ccs(merged)) |
| 588 | assert ccs.get(7) == 100 # ours = left = 100 |