test_muse_checkout.py
python
| 1 | """Tests for the Muse Checkout Engine (Phase 9). |
| 2 | |
| 3 | Verifies: |
| 4 | - No-op checkout when target == working. |
| 5 | - Note add checkout produces stori_add_notes. |
| 6 | - Controller restore produces correct tool calls. |
| 7 | - Large diff triggers region reset (clear + add). |
| 8 | - Determinism (same inputs → same plan hash). |
| 9 | - Boundary seal (AST). |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import ast |
| 14 | from pathlib import Path |
| 15 | import pytest |
| 16 | from typing_extensions import TypedDict |
| 17 | |
| 18 | from maestro.contracts.json_types import ( |
| 19 | AftertouchDict, |
| 20 | CCEventDict, |
| 21 | NoteDict, |
| 22 | PitchBendDict, |
| 23 | RegionAftertouchMap, |
| 24 | RegionCCMap, |
| 25 | RegionNotesMap, |
| 26 | RegionPitchBendMap, |
| 27 | ) |
| 28 | from maestro.services.muse_checkout import ( |
| 29 | CheckoutPlan, |
| 30 | REGION_RESET_THRESHOLD, |
| 31 | build_checkout_plan, |
| 32 | ) |
| 33 | |
| 34 | |
| 35 | class _PlanArgs(TypedDict, total=False): |
| 36 | """Keyword arguments for ``build_checkout_plan`` — mirrors its signature.""" |
| 37 | project_id: str |
| 38 | target_variation_id: str |
| 39 | target_notes: RegionNotesMap |
| 40 | target_cc: RegionCCMap |
| 41 | target_pb: RegionPitchBendMap |
| 42 | target_at: RegionAftertouchMap |
| 43 | working_notes: RegionNotesMap |
| 44 | working_cc: RegionCCMap |
| 45 | working_pb: RegionPitchBendMap |
| 46 | working_at: RegionAftertouchMap |
| 47 | track_regions: dict[str, str] |
| 48 | |
| 49 | |
| 50 | # ── Helpers ─────────────────────────────────────────────────────────────── |
| 51 | |
| 52 | |
| 53 | def _note(pitch: int, start: float, dur: float = 1.0, vel: int = 100) -> NoteDict: |
| 54 | return {"pitch": pitch, "start_beat": start, "duration_beats": dur, "velocity": vel, "channel": 0} |
| 55 | |
| 56 | |
| 57 | def _cc(cc_num: int, beat: float, value: int) -> CCEventDict: |
| 58 | return {"cc": cc_num, "beat": beat, "value": value} |
| 59 | |
| 60 | |
| 61 | def _pb(beat: float, value: int) -> PitchBendDict: |
| 62 | return {"beat": beat, "value": value} |
| 63 | |
| 64 | |
| 65 | def _at(beat: float, value: int, pitch: int | None = None) -> AftertouchDict: |
| 66 | d: AftertouchDict = {"beat": beat, "value": value} |
| 67 | if pitch is not None: |
| 68 | d["pitch"] = pitch |
| 69 | return d |
| 70 | |
| 71 | |
| 72 | def _empty_plan_args( |
| 73 | *, |
| 74 | target_notes: dict[str, list[NoteDict]] | None = None, |
| 75 | working_notes: dict[str, list[NoteDict]] | None = None, |
| 76 | target_cc: dict[str, list[CCEventDict]] | None = None, |
| 77 | working_cc: dict[str, list[CCEventDict]] | None = None, |
| 78 | target_pb: dict[str, list[PitchBendDict]] | None = None, |
| 79 | working_pb: dict[str, list[PitchBendDict]] | None = None, |
| 80 | target_at: dict[str, list[AftertouchDict]] | None = None, |
| 81 | working_at: dict[str, list[AftertouchDict]] | None = None, |
| 82 | track_regions: dict[str, str] | None = None, |
| 83 | ) -> _PlanArgs: |
| 84 | return _PlanArgs( |
| 85 | project_id="proj-1", |
| 86 | target_variation_id="var-1", |
| 87 | target_notes=target_notes or {}, |
| 88 | working_notes=working_notes or {}, |
| 89 | target_cc=target_cc or {}, |
| 90 | working_cc=working_cc or {}, |
| 91 | target_pb=target_pb or {}, |
| 92 | working_pb=working_pb or {}, |
| 93 | target_at=target_at or {}, |
| 94 | working_at=working_at or {}, |
| 95 | track_regions=track_regions or {}, |
| 96 | ) |
| 97 | |
| 98 | |
| 99 | # --------------------------------------------------------------------------- |
| 100 | # 6.1 — No-Op Checkout |
| 101 | # --------------------------------------------------------------------------- |
| 102 | |
| 103 | |
| 104 | class TestNoOpCheckout: |
| 105 | |
| 106 | def test_identical_state_produces_no_calls(self) -> None: |
| 107 | |
| 108 | notes = {"r1": [_note(60, 0.0), _note(64, 1.0)]} |
| 109 | cc = {"r1": [_cc(64, 0.0, 127)]} |
| 110 | plan = build_checkout_plan(**_empty_plan_args( |
| 111 | target_notes=notes, working_notes=notes, |
| 112 | target_cc=cc, working_cc=cc, |
| 113 | track_regions={"r1": "t1"}, |
| 114 | )) |
| 115 | assert plan.is_noop |
| 116 | assert plan.tool_calls == () |
| 117 | assert plan.regions_reset == () |
| 118 | |
| 119 | def test_empty_state_is_noop(self) -> None: |
| 120 | |
| 121 | plan = build_checkout_plan(**_empty_plan_args()) |
| 122 | assert plan.is_noop |
| 123 | |
| 124 | def test_fingerprint_target_still_populated(self) -> None: |
| 125 | |
| 126 | notes = {"r1": [_note(60, 0.0)]} |
| 127 | plan = build_checkout_plan(**_empty_plan_args( |
| 128 | target_notes=notes, working_notes=notes, |
| 129 | track_regions={"r1": "t1"}, |
| 130 | )) |
| 131 | assert "r1" in plan.fingerprint_target |
| 132 | assert len(plan.fingerprint_target["r1"]) == 16 |
| 133 | |
| 134 | |
| 135 | # --------------------------------------------------------------------------- |
| 136 | # 6.2 — Note Add Checkout |
| 137 | # --------------------------------------------------------------------------- |
| 138 | |
| 139 | |
| 140 | class TestNoteAddCheckout: |
| 141 | |
| 142 | def test_missing_note_produces_add(self) -> None: |
| 143 | |
| 144 | plan = build_checkout_plan(**_empty_plan_args( |
| 145 | target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]}, |
| 146 | working_notes={"r1": [_note(60, 0.0)]}, |
| 147 | track_regions={"r1": "t1"}, |
| 148 | )) |
| 149 | assert not plan.is_noop |
| 150 | add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"] |
| 151 | assert len(add_calls) == 1 |
| 152 | notes = add_calls[0]["arguments"]["notes"] |
| 153 | assert isinstance(notes, list) |
| 154 | assert len(notes) == 1 |
| 155 | assert isinstance(notes[0], dict) |
| 156 | assert notes[0]["pitch"] == 72 |
| 157 | |
| 158 | def test_region_with_removals_triggers_reset(self) -> None: |
| 159 | |
| 160 | """Removing a note requires clear+add because no individual remove tool exists.""" |
| 161 | plan = build_checkout_plan(**_empty_plan_args( |
| 162 | target_notes={"r1": [_note(60, 0.0)]}, |
| 163 | working_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]}, |
| 164 | track_regions={"r1": "t1"}, |
| 165 | )) |
| 166 | assert "r1" in plan.regions_reset |
| 167 | clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"] |
| 168 | assert len(clear_calls) == 1 |
| 169 | |
| 170 | def test_modified_note_triggers_reset(self) -> None: |
| 171 | |
| 172 | plan = build_checkout_plan(**_empty_plan_args( |
| 173 | target_notes={"r1": [_note(60, 0.0, vel=80)]}, |
| 174 | working_notes={"r1": [_note(60, 0.0, vel=120)]}, |
| 175 | track_regions={"r1": "t1"}, |
| 176 | )) |
| 177 | assert "r1" in plan.regions_reset |
| 178 | |
| 179 | def test_add_to_empty_region_no_clear(self) -> None: |
| 180 | |
| 181 | """Adding notes to an empty region should not produce a clear call.""" |
| 182 | plan = build_checkout_plan(**_empty_plan_args( |
| 183 | target_notes={"r1": [_note(60, 0.0)]}, |
| 184 | working_notes={"r1": []}, |
| 185 | track_regions={"r1": "t1"}, |
| 186 | )) |
| 187 | clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"] |
| 188 | add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"] |
| 189 | assert len(clear_calls) == 0 |
| 190 | assert len(add_calls) == 1 |
| 191 | |
| 192 | |
| 193 | # --------------------------------------------------------------------------- |
| 194 | # 6.3 — Controller Restore |
| 195 | # --------------------------------------------------------------------------- |
| 196 | |
| 197 | |
| 198 | class TestControllerRestore: |
| 199 | |
| 200 | def test_missing_pb_produces_add_pitch_bend(self) -> None: |
| 201 | |
| 202 | plan = build_checkout_plan(**_empty_plan_args( |
| 203 | target_notes={"r1": [_note(60, 0.0)]}, |
| 204 | working_notes={"r1": [_note(60, 0.0)]}, |
| 205 | target_pb={"r1": [_pb(1.0, 4096)]}, |
| 206 | working_pb={"r1": []}, |
| 207 | track_regions={"r1": "t1"}, |
| 208 | )) |
| 209 | pb_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_pitch_bend"] |
| 210 | assert len(pb_calls) == 1 |
| 211 | pb_events = pb_calls[0]["arguments"]["events"] |
| 212 | assert isinstance(pb_events, list) |
| 213 | assert isinstance(pb_events[0], dict) |
| 214 | assert pb_events[0]["value"] == 4096 |
| 215 | |
| 216 | def test_missing_cc_produces_add_midi_cc(self) -> None: |
| 217 | |
| 218 | plan = build_checkout_plan(**_empty_plan_args( |
| 219 | target_notes={"r1": [_note(60, 0.0)]}, |
| 220 | working_notes={"r1": [_note(60, 0.0)]}, |
| 221 | target_cc={"r1": [_cc(64, 0.0, 127)]}, |
| 222 | working_cc={"r1": []}, |
| 223 | track_regions={"r1": "t1"}, |
| 224 | )) |
| 225 | cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"] |
| 226 | assert len(cc_calls) == 1 |
| 227 | assert cc_calls[0]["arguments"]["cc"] == 64 |
| 228 | |
| 229 | def test_missing_at_produces_add_aftertouch(self) -> None: |
| 230 | |
| 231 | plan = build_checkout_plan(**_empty_plan_args( |
| 232 | target_notes={"r1": [_note(60, 0.0)]}, |
| 233 | working_notes={"r1": [_note(60, 0.0)]}, |
| 234 | target_at={"r1": [_at(2.0, 80, pitch=60)]}, |
| 235 | working_at={"r1": []}, |
| 236 | track_regions={"r1": "t1"}, |
| 237 | )) |
| 238 | at_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_aftertouch"] |
| 239 | assert len(at_calls) == 1 |
| 240 | at_events = at_calls[0]["arguments"]["events"] |
| 241 | assert isinstance(at_events, list) |
| 242 | assert isinstance(at_events[0], dict) |
| 243 | assert at_events[0]["pitch"] == 60 |
| 244 | |
| 245 | def test_modified_cc_value_produces_call(self) -> None: |
| 246 | |
| 247 | plan = build_checkout_plan(**_empty_plan_args( |
| 248 | target_notes={"r1": [_note(60, 0.0)]}, |
| 249 | working_notes={"r1": [_note(60, 0.0)]}, |
| 250 | target_cc={"r1": [_cc(64, 0.0, 0)]}, |
| 251 | working_cc={"r1": [_cc(64, 0.0, 127)]}, |
| 252 | track_regions={"r1": "t1"}, |
| 253 | )) |
| 254 | cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"] |
| 255 | assert len(cc_calls) == 1 |
| 256 | cc_events = cc_calls[0]["arguments"]["events"] |
| 257 | assert isinstance(cc_events, list) |
| 258 | assert isinstance(cc_events[0], dict) |
| 259 | assert cc_events[0]["value"] == 0 |
| 260 | |
| 261 | def test_multiple_cc_numbers_grouped(self) -> None: |
| 262 | |
| 263 | plan = build_checkout_plan(**_empty_plan_args( |
| 264 | target_notes={"r1": [_note(60, 0.0)]}, |
| 265 | working_notes={"r1": [_note(60, 0.0)]}, |
| 266 | target_cc={"r1": [_cc(1, 0.0, 64), _cc(64, 2.0, 127)]}, |
| 267 | working_cc={"r1": []}, |
| 268 | track_regions={"r1": "t1"}, |
| 269 | )) |
| 270 | cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"] |
| 271 | assert len(cc_calls) == 2 |
| 272 | cc_numbers = sorted( |
| 273 | cc for c in cc_calls if isinstance((cc := c["arguments"]["cc"]), int) |
| 274 | ) |
| 275 | assert cc_numbers == [1, 64] |
| 276 | |
| 277 | |
| 278 | # --------------------------------------------------------------------------- |
| 279 | # 6.4 — Large Drift Fallback |
| 280 | # --------------------------------------------------------------------------- |
| 281 | |
| 282 | |
| 283 | class TestLargeDriftFallback: |
| 284 | |
| 285 | def test_many_additions_trigger_reset(self) -> None: |
| 286 | |
| 287 | target_notes = [_note(p, float(p - 40)) for p in range(40, 40 + REGION_RESET_THRESHOLD + 5)] |
| 288 | plan = build_checkout_plan(**_empty_plan_args( |
| 289 | target_notes={"r1": target_notes}, |
| 290 | working_notes={"r1": []}, |
| 291 | track_regions={"r1": "t1"}, |
| 292 | )) |
| 293 | assert "r1" in plan.regions_reset |
| 294 | clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"] |
| 295 | add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"] |
| 296 | assert len(clear_calls) == 1 |
| 297 | assert len(add_calls) == 1 |
| 298 | large_notes = add_calls[0]["arguments"]["notes"] |
| 299 | assert isinstance(large_notes, list) |
| 300 | assert len(large_notes) == len(target_notes) |
| 301 | |
| 302 | def test_below_threshold_pure_additions_no_reset(self) -> None: |
| 303 | |
| 304 | target_notes = [_note(60, 0.0), _note(62, 1.0)] |
| 305 | plan = build_checkout_plan(**_empty_plan_args( |
| 306 | target_notes={"r1": target_notes}, |
| 307 | working_notes={"r1": []}, |
| 308 | track_regions={"r1": "t1"}, |
| 309 | )) |
| 310 | assert "r1" not in plan.regions_reset |
| 311 | |
| 312 | |
| 313 | # --------------------------------------------------------------------------- |
| 314 | # 6.5 — Determinism Test |
| 315 | # --------------------------------------------------------------------------- |
| 316 | |
| 317 | |
| 318 | class TestDeterminism: |
| 319 | |
| 320 | def test_same_inputs_produce_same_hash(self) -> None: |
| 321 | |
| 322 | args = _empty_plan_args( |
| 323 | target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]}, |
| 324 | working_notes={"r1": [_note(60, 0.0)]}, |
| 325 | target_cc={"r1": [_cc(64, 0.0, 127)]}, |
| 326 | working_cc={"r1": []}, |
| 327 | track_regions={"r1": "t1"}, |
| 328 | ) |
| 329 | plan1 = build_checkout_plan(**args) |
| 330 | plan2 = build_checkout_plan(**args) |
| 331 | assert plan1.plan_hash() == plan2.plan_hash() |
| 332 | |
| 333 | def test_different_inputs_produce_different_hash(self) -> None: |
| 334 | |
| 335 | args1 = _empty_plan_args( |
| 336 | target_notes={"r1": [_note(60, 0.0)]}, |
| 337 | working_notes={"r1": []}, |
| 338 | track_regions={"r1": "t1"}, |
| 339 | ) |
| 340 | args2 = _empty_plan_args( |
| 341 | target_notes={"r1": [_note(72, 0.0)]}, |
| 342 | working_notes={"r1": []}, |
| 343 | track_regions={"r1": "t1"}, |
| 344 | ) |
| 345 | plan1 = build_checkout_plan(**args1) |
| 346 | plan2 = build_checkout_plan(**args2) |
| 347 | assert plan1.plan_hash() != plan2.plan_hash() |
| 348 | |
| 349 | def test_tool_call_ordering_deterministic(self) -> None: |
| 350 | |
| 351 | """Calls are ordered: clear → add_notes → cc → pb → at per region.""" |
| 352 | plan = build_checkout_plan(**_empty_plan_args( |
| 353 | target_notes={"r1": [_note(60, 0.0, vel=80)]}, |
| 354 | working_notes={"r1": [_note(60, 0.0, vel=120)]}, |
| 355 | target_cc={"r1": [_cc(64, 0.0, 127)]}, |
| 356 | working_cc={"r1": []}, |
| 357 | target_pb={"r1": [_pb(1.0, 4096)]}, |
| 358 | working_pb={"r1": []}, |
| 359 | target_at={"r1": [_at(2.0, 80)]}, |
| 360 | working_at={"r1": []}, |
| 361 | track_regions={"r1": "t1"}, |
| 362 | )) |
| 363 | tools = [c["tool"] for c in plan.tool_calls] |
| 364 | assert tools == [ |
| 365 | "stori_clear_notes", |
| 366 | "stori_add_notes", |
| 367 | "stori_add_midi_cc", |
| 368 | "stori_add_pitch_bend", |
| 369 | "stori_add_aftertouch", |
| 370 | ] |
| 371 | |
| 372 | |
| 373 | # --------------------------------------------------------------------------- |
| 374 | # 6.6 — Boundary Seal |
| 375 | # --------------------------------------------------------------------------- |
| 376 | |
| 377 | |
| 378 | class TestCheckoutBoundary: |
| 379 | |
| 380 | def test_no_state_store_or_executor_import(self) -> None: |
| 381 | |
| 382 | filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py" |
| 383 | tree = ast.parse(filepath.read_text()) |
| 384 | forbidden = {"state_store", "executor", "maestro_handlers", "maestro_editing", "maestro_composing"} |
| 385 | for node in ast.walk(tree): |
| 386 | if isinstance(node, ast.ImportFrom) and node.module: |
| 387 | for fb in forbidden: |
| 388 | assert fb not in node.module, ( |
| 389 | f"muse_checkout imports forbidden module: {node.module}" |
| 390 | ) |
| 391 | |
| 392 | def test_no_forbidden_names(self) -> None: |
| 393 | |
| 394 | filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py" |
| 395 | tree = ast.parse(filepath.read_text()) |
| 396 | forbidden_names = {"StateStore", "get_or_create_store", "EntityRegistry"} |
| 397 | for node in ast.walk(tree): |
| 398 | if isinstance(node, (ast.Import, ast.ImportFrom)): |
| 399 | for alias in node.names: |
| 400 | assert alias.name not in forbidden_names, ( |
| 401 | f"muse_checkout imports forbidden name: {alias.name}" |
| 402 | ) |
| 403 | |
| 404 | def test_no_get_or_create_store_call(self) -> None: |
| 405 | |
| 406 | filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py" |
| 407 | tree = ast.parse(filepath.read_text()) |
| 408 | for node in ast.walk(tree): |
| 409 | if isinstance(node, ast.Call): |
| 410 | func = node.func |
| 411 | name = "" |
| 412 | if isinstance(func, ast.Name): |
| 413 | name = func.id |
| 414 | elif isinstance(func, ast.Attribute): |
| 415 | name = func.attr |
| 416 | assert name != "get_or_create_store", ( |
| 417 | "muse_checkout.py calls get_or_create_store" |
| 418 | ) |