cgcardona / muse public
test_muse_checkout.py python
418 lines 15.2 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for the Muse Checkout Engine (Phase 9).
2
3 Verifies:
4 - No-op checkout when target == working.
5 - Note add checkout produces stori_add_notes.
6 - Controller restore produces correct tool calls.
7 - Large diff triggers region reset (clear + add).
8 - Determinism (same inputs → same plan hash).
9 - Boundary seal (AST).
10 """
11 from __future__ import annotations
12
13 import ast
14 from pathlib import Path
15 import pytest
16 from typing_extensions import TypedDict
17
18 from maestro.contracts.json_types import (
19 AftertouchDict,
20 CCEventDict,
21 NoteDict,
22 PitchBendDict,
23 RegionAftertouchMap,
24 RegionCCMap,
25 RegionNotesMap,
26 RegionPitchBendMap,
27 )
28 from maestro.services.muse_checkout import (
29 CheckoutPlan,
30 REGION_RESET_THRESHOLD,
31 build_checkout_plan,
32 )
33
34
35 class _PlanArgs(TypedDict, total=False):
36 """Keyword arguments for ``build_checkout_plan`` — mirrors its signature."""
37 project_id: str
38 target_variation_id: str
39 target_notes: RegionNotesMap
40 target_cc: RegionCCMap
41 target_pb: RegionPitchBendMap
42 target_at: RegionAftertouchMap
43 working_notes: RegionNotesMap
44 working_cc: RegionCCMap
45 working_pb: RegionPitchBendMap
46 working_at: RegionAftertouchMap
47 track_regions: dict[str, str]
48
49
50 # ── Helpers ───────────────────────────────────────────────────────────────
51
52
53 def _note(pitch: int, start: float, dur: float = 1.0, vel: int = 100) -> NoteDict:
54 return {"pitch": pitch, "start_beat": start, "duration_beats": dur, "velocity": vel, "channel": 0}
55
56
57 def _cc(cc_num: int, beat: float, value: int) -> CCEventDict:
58 return {"cc": cc_num, "beat": beat, "value": value}
59
60
61 def _pb(beat: float, value: int) -> PitchBendDict:
62 return {"beat": beat, "value": value}
63
64
65 def _at(beat: float, value: int, pitch: int | None = None) -> AftertouchDict:
66 d: AftertouchDict = {"beat": beat, "value": value}
67 if pitch is not None:
68 d["pitch"] = pitch
69 return d
70
71
72 def _empty_plan_args(
73 *,
74 target_notes: dict[str, list[NoteDict]] | None = None,
75 working_notes: dict[str, list[NoteDict]] | None = None,
76 target_cc: dict[str, list[CCEventDict]] | None = None,
77 working_cc: dict[str, list[CCEventDict]] | None = None,
78 target_pb: dict[str, list[PitchBendDict]] | None = None,
79 working_pb: dict[str, list[PitchBendDict]] | None = None,
80 target_at: dict[str, list[AftertouchDict]] | None = None,
81 working_at: dict[str, list[AftertouchDict]] | None = None,
82 track_regions: dict[str, str] | None = None,
83 ) -> _PlanArgs:
84 return _PlanArgs(
85 project_id="proj-1",
86 target_variation_id="var-1",
87 target_notes=target_notes or {},
88 working_notes=working_notes or {},
89 target_cc=target_cc or {},
90 working_cc=working_cc or {},
91 target_pb=target_pb or {},
92 working_pb=working_pb or {},
93 target_at=target_at or {},
94 working_at=working_at or {},
95 track_regions=track_regions or {},
96 )
97
98
99 # ---------------------------------------------------------------------------
100 # 6.1 — No-Op Checkout
101 # ---------------------------------------------------------------------------
102
103
104 class TestNoOpCheckout:
105
106 def test_identical_state_produces_no_calls(self) -> None:
107
108 notes = {"r1": [_note(60, 0.0), _note(64, 1.0)]}
109 cc = {"r1": [_cc(64, 0.0, 127)]}
110 plan = build_checkout_plan(**_empty_plan_args(
111 target_notes=notes, working_notes=notes,
112 target_cc=cc, working_cc=cc,
113 track_regions={"r1": "t1"},
114 ))
115 assert plan.is_noop
116 assert plan.tool_calls == ()
117 assert plan.regions_reset == ()
118
119 def test_empty_state_is_noop(self) -> None:
120
121 plan = build_checkout_plan(**_empty_plan_args())
122 assert plan.is_noop
123
124 def test_fingerprint_target_still_populated(self) -> None:
125
126 notes = {"r1": [_note(60, 0.0)]}
127 plan = build_checkout_plan(**_empty_plan_args(
128 target_notes=notes, working_notes=notes,
129 track_regions={"r1": "t1"},
130 ))
131 assert "r1" in plan.fingerprint_target
132 assert len(plan.fingerprint_target["r1"]) == 16
133
134
135 # ---------------------------------------------------------------------------
136 # 6.2 — Note Add Checkout
137 # ---------------------------------------------------------------------------
138
139
140 class TestNoteAddCheckout:
141
142 def test_missing_note_produces_add(self) -> None:
143
144 plan = build_checkout_plan(**_empty_plan_args(
145 target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
146 working_notes={"r1": [_note(60, 0.0)]},
147 track_regions={"r1": "t1"},
148 ))
149 assert not plan.is_noop
150 add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"]
151 assert len(add_calls) == 1
152 notes = add_calls[0]["arguments"]["notes"]
153 assert isinstance(notes, list)
154 assert len(notes) == 1
155 assert isinstance(notes[0], dict)
156 assert notes[0]["pitch"] == 72
157
158 def test_region_with_removals_triggers_reset(self) -> None:
159
160 """Removing a note requires clear+add because no individual remove tool exists."""
161 plan = build_checkout_plan(**_empty_plan_args(
162 target_notes={"r1": [_note(60, 0.0)]},
163 working_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
164 track_regions={"r1": "t1"},
165 ))
166 assert "r1" in plan.regions_reset
167 clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"]
168 assert len(clear_calls) == 1
169
170 def test_modified_note_triggers_reset(self) -> None:
171
172 plan = build_checkout_plan(**_empty_plan_args(
173 target_notes={"r1": [_note(60, 0.0, vel=80)]},
174 working_notes={"r1": [_note(60, 0.0, vel=120)]},
175 track_regions={"r1": "t1"},
176 ))
177 assert "r1" in plan.regions_reset
178
179 def test_add_to_empty_region_no_clear(self) -> None:
180
181 """Adding notes to an empty region should not produce a clear call."""
182 plan = build_checkout_plan(**_empty_plan_args(
183 target_notes={"r1": [_note(60, 0.0)]},
184 working_notes={"r1": []},
185 track_regions={"r1": "t1"},
186 ))
187 clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"]
188 add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"]
189 assert len(clear_calls) == 0
190 assert len(add_calls) == 1
191
192
193 # ---------------------------------------------------------------------------
194 # 6.3 — Controller Restore
195 # ---------------------------------------------------------------------------
196
197
198 class TestControllerRestore:
199
200 def test_missing_pb_produces_add_pitch_bend(self) -> None:
201
202 plan = build_checkout_plan(**_empty_plan_args(
203 target_notes={"r1": [_note(60, 0.0)]},
204 working_notes={"r1": [_note(60, 0.0)]},
205 target_pb={"r1": [_pb(1.0, 4096)]},
206 working_pb={"r1": []},
207 track_regions={"r1": "t1"},
208 ))
209 pb_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_pitch_bend"]
210 assert len(pb_calls) == 1
211 pb_events = pb_calls[0]["arguments"]["events"]
212 assert isinstance(pb_events, list)
213 assert isinstance(pb_events[0], dict)
214 assert pb_events[0]["value"] == 4096
215
216 def test_missing_cc_produces_add_midi_cc(self) -> None:
217
218 plan = build_checkout_plan(**_empty_plan_args(
219 target_notes={"r1": [_note(60, 0.0)]},
220 working_notes={"r1": [_note(60, 0.0)]},
221 target_cc={"r1": [_cc(64, 0.0, 127)]},
222 working_cc={"r1": []},
223 track_regions={"r1": "t1"},
224 ))
225 cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"]
226 assert len(cc_calls) == 1
227 assert cc_calls[0]["arguments"]["cc"] == 64
228
229 def test_missing_at_produces_add_aftertouch(self) -> None:
230
231 plan = build_checkout_plan(**_empty_plan_args(
232 target_notes={"r1": [_note(60, 0.0)]},
233 working_notes={"r1": [_note(60, 0.0)]},
234 target_at={"r1": [_at(2.0, 80, pitch=60)]},
235 working_at={"r1": []},
236 track_regions={"r1": "t1"},
237 ))
238 at_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_aftertouch"]
239 assert len(at_calls) == 1
240 at_events = at_calls[0]["arguments"]["events"]
241 assert isinstance(at_events, list)
242 assert isinstance(at_events[0], dict)
243 assert at_events[0]["pitch"] == 60
244
245 def test_modified_cc_value_produces_call(self) -> None:
246
247 plan = build_checkout_plan(**_empty_plan_args(
248 target_notes={"r1": [_note(60, 0.0)]},
249 working_notes={"r1": [_note(60, 0.0)]},
250 target_cc={"r1": [_cc(64, 0.0, 0)]},
251 working_cc={"r1": [_cc(64, 0.0, 127)]},
252 track_regions={"r1": "t1"},
253 ))
254 cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"]
255 assert len(cc_calls) == 1
256 cc_events = cc_calls[0]["arguments"]["events"]
257 assert isinstance(cc_events, list)
258 assert isinstance(cc_events[0], dict)
259 assert cc_events[0]["value"] == 0
260
261 def test_multiple_cc_numbers_grouped(self) -> None:
262
263 plan = build_checkout_plan(**_empty_plan_args(
264 target_notes={"r1": [_note(60, 0.0)]},
265 working_notes={"r1": [_note(60, 0.0)]},
266 target_cc={"r1": [_cc(1, 0.0, 64), _cc(64, 2.0, 127)]},
267 working_cc={"r1": []},
268 track_regions={"r1": "t1"},
269 ))
270 cc_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_midi_cc"]
271 assert len(cc_calls) == 2
272 cc_numbers = sorted(
273 cc for c in cc_calls if isinstance((cc := c["arguments"]["cc"]), int)
274 )
275 assert cc_numbers == [1, 64]
276
277
278 # ---------------------------------------------------------------------------
279 # 6.4 — Large Drift Fallback
280 # ---------------------------------------------------------------------------
281
282
283 class TestLargeDriftFallback:
284
285 def test_many_additions_trigger_reset(self) -> None:
286
287 target_notes = [_note(p, float(p - 40)) for p in range(40, 40 + REGION_RESET_THRESHOLD + 5)]
288 plan = build_checkout_plan(**_empty_plan_args(
289 target_notes={"r1": target_notes},
290 working_notes={"r1": []},
291 track_regions={"r1": "t1"},
292 ))
293 assert "r1" in plan.regions_reset
294 clear_calls = [c for c in plan.tool_calls if c["tool"] == "stori_clear_notes"]
295 add_calls = [c for c in plan.tool_calls if c["tool"] == "stori_add_notes"]
296 assert len(clear_calls) == 1
297 assert len(add_calls) == 1
298 large_notes = add_calls[0]["arguments"]["notes"]
299 assert isinstance(large_notes, list)
300 assert len(large_notes) == len(target_notes)
301
302 def test_below_threshold_pure_additions_no_reset(self) -> None:
303
304 target_notes = [_note(60, 0.0), _note(62, 1.0)]
305 plan = build_checkout_plan(**_empty_plan_args(
306 target_notes={"r1": target_notes},
307 working_notes={"r1": []},
308 track_regions={"r1": "t1"},
309 ))
310 assert "r1" not in plan.regions_reset
311
312
313 # ---------------------------------------------------------------------------
314 # 6.5 — Determinism Test
315 # ---------------------------------------------------------------------------
316
317
318 class TestDeterminism:
319
320 def test_same_inputs_produce_same_hash(self) -> None:
321
322 args = _empty_plan_args(
323 target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
324 working_notes={"r1": [_note(60, 0.0)]},
325 target_cc={"r1": [_cc(64, 0.0, 127)]},
326 working_cc={"r1": []},
327 track_regions={"r1": "t1"},
328 )
329 plan1 = build_checkout_plan(**args)
330 plan2 = build_checkout_plan(**args)
331 assert plan1.plan_hash() == plan2.plan_hash()
332
333 def test_different_inputs_produce_different_hash(self) -> None:
334
335 args1 = _empty_plan_args(
336 target_notes={"r1": [_note(60, 0.0)]},
337 working_notes={"r1": []},
338 track_regions={"r1": "t1"},
339 )
340 args2 = _empty_plan_args(
341 target_notes={"r1": [_note(72, 0.0)]},
342 working_notes={"r1": []},
343 track_regions={"r1": "t1"},
344 )
345 plan1 = build_checkout_plan(**args1)
346 plan2 = build_checkout_plan(**args2)
347 assert plan1.plan_hash() != plan2.plan_hash()
348
349 def test_tool_call_ordering_deterministic(self) -> None:
350
351 """Calls are ordered: clear → add_notes → cc → pb → at per region."""
352 plan = build_checkout_plan(**_empty_plan_args(
353 target_notes={"r1": [_note(60, 0.0, vel=80)]},
354 working_notes={"r1": [_note(60, 0.0, vel=120)]},
355 target_cc={"r1": [_cc(64, 0.0, 127)]},
356 working_cc={"r1": []},
357 target_pb={"r1": [_pb(1.0, 4096)]},
358 working_pb={"r1": []},
359 target_at={"r1": [_at(2.0, 80)]},
360 working_at={"r1": []},
361 track_regions={"r1": "t1"},
362 ))
363 tools = [c["tool"] for c in plan.tool_calls]
364 assert tools == [
365 "stori_clear_notes",
366 "stori_add_notes",
367 "stori_add_midi_cc",
368 "stori_add_pitch_bend",
369 "stori_add_aftertouch",
370 ]
371
372
373 # ---------------------------------------------------------------------------
374 # 6.6 — Boundary Seal
375 # ---------------------------------------------------------------------------
376
377
378 class TestCheckoutBoundary:
379
380 def test_no_state_store_or_executor_import(self) -> None:
381
382 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py"
383 tree = ast.parse(filepath.read_text())
384 forbidden = {"state_store", "executor", "maestro_handlers", "maestro_editing", "maestro_composing"}
385 for node in ast.walk(tree):
386 if isinstance(node, ast.ImportFrom) and node.module:
387 for fb in forbidden:
388 assert fb not in node.module, (
389 f"muse_checkout imports forbidden module: {node.module}"
390 )
391
392 def test_no_forbidden_names(self) -> None:
393
394 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py"
395 tree = ast.parse(filepath.read_text())
396 forbidden_names = {"StateStore", "get_or_create_store", "EntityRegistry"}
397 for node in ast.walk(tree):
398 if isinstance(node, (ast.Import, ast.ImportFrom)):
399 for alias in node.names:
400 assert alias.name not in forbidden_names, (
401 f"muse_checkout imports forbidden name: {alias.name}"
402 )
403
404 def test_no_get_or_create_store_call(self) -> None:
405
406 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout.py"
407 tree = ast.parse(filepath.read_text())
408 for node in ast.walk(tree):
409 if isinstance(node, ast.Call):
410 func = node.func
411 name = ""
412 if isinstance(func, ast.Name):
413 name = func.id
414 elif isinstance(func, ast.Attribute):
415 name = func.attr
416 assert name != "get_or_create_store", (
417 "muse_checkout.py calls get_or_create_store"
418 )