cgcardona / muse public
test_muse_checkout_execution.py python
411 lines 14.4 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for Muse Checkout Execution (Phase 10).
2
3 Verifies:
4 - No-op execution.
5 - Undo execution (checkout to parent).
6 - Redo execution (forward checkout).
7 - Drift safety block.
8 - Force override.
9 - Boundary seal (AST).
10 """
11 from __future__ import annotations
12
13 import ast
14 import uuid
15 from collections.abc import AsyncGenerator
16 from pathlib import Path
17
18 import pytest
19 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
20
21 from maestro.contracts.json_types import CCEventDict, NoteDict
22 from maestro.core.state_store import StateStore
23 from maestro.core.tracing import TraceContext
24 from maestro.core.tools import ToolName
25 from maestro.db.database import Base
26 from maestro.db import muse_models # noqa: F401 — register tables
27 from maestro.models.variation import (
28 MidiNoteSnapshot,
29 NoteChange,
30 Phrase,
31 Variation,
32 )
33 from maestro.services import muse_repository
34 from maestro.services.muse_checkout import build_checkout_plan
35 from maestro.services.muse_checkout_executor import (
36 CheckoutExecutionResult,
37 execute_checkout_plan,
38 )
39 from maestro.services.muse_history_controller import (
40 CheckoutBlockedError,
41 CheckoutSummary,
42 checkout_to_variation,
43 )
44
45
46 # ── Fixtures ──────────────────────────────────────────────────────────────
47
48
49 @pytest.fixture
50 def store() -> StateStore:
51 return StateStore(conversation_id="test-conv", project_id="test-proj")
52
53
54 @pytest.fixture
55 def trace() -> TraceContext:
56 return TraceContext(trace_id="test-trace-001")
57
58
59 @pytest.fixture
60 async def async_session() -> AsyncGenerator[AsyncSession, None]:
61 engine = create_async_engine("sqlite+aiosqlite:///:memory:")
62 async with engine.begin() as conn:
63 await conn.run_sync(Base.metadata.create_all)
64 Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
65 async with Session() as session:
66 yield session
67 await engine.dispose()
68
69
70 # ── Helpers ───────────────────────────────────────────────────────────────
71
72
73 def _note(pitch: int, start: float, dur: float = 1.0, vel: int = 100) -> NoteDict:
74
75 return {"pitch": pitch, "start_beat": start, "duration_beats": dur, "velocity": vel, "channel": 0}
76
77
78 def _cc(cc_num: int, beat: float, value: int) -> CCEventDict:
79
80 return {"cc": cc_num, "beat": beat, "value": value}
81
82
83 def _make_variation(
84 notes: list[NoteDict],
85 region_id: str = "region-1",
86 track_id: str = "track-1",
87 ) -> Variation:
88 vid = str(uuid.uuid4())
89 pid = str(uuid.uuid4())
90 return Variation(
91 variation_id=vid,
92 intent="test",
93 ai_explanation="test",
94 affected_tracks=[track_id],
95 affected_regions=[region_id],
96 beat_range=(0.0, 8.0),
97 phrases=[
98 Phrase(
99 phrase_id=pid,
100 track_id=track_id,
101 region_id=region_id,
102 start_beat=0.0,
103 end_beat=8.0,
104 label="Test",
105 note_changes=[
106 NoteChange(
107 note_id=str(uuid.uuid4()),
108 change_type="added",
109 after=MidiNoteSnapshot.from_note_dict(n),
110 )
111 for n in notes
112 ],
113 ),
114 ],
115 )
116
117
118 def _setup_region(store: StateStore, region_id: str = "region-1", track_id: str = "track-1") -> None:
119
120 """Create track + region in StateStore so add_notes works."""
121 txn = store.begin_transaction("setup")
122 store.create_track("Track", track_id=track_id, transaction=txn)
123 store.create_region("Region", parent_track_id=track_id, region_id=region_id, transaction=txn)
124 store.commit(txn)
125
126
127 # ---------------------------------------------------------------------------
128 # 6.1 — No-op Checkout Execution
129 # ---------------------------------------------------------------------------
130
131
132 class TestNoOpExecution:
133
134 def test_noop_plan_executes_zero_calls(self, store: StateStore, trace: TraceContext) -> None:
135
136 plan = build_checkout_plan(
137 project_id="p1", target_variation_id="v1",
138 target_notes={"r1": [_note(60, 0.0)]},
139 working_notes={"r1": [_note(60, 0.0)]},
140 target_cc={}, working_cc={},
141 target_pb={}, working_pb={},
142 target_at={}, working_at={},
143 track_regions={"r1": "t1"},
144 )
145 assert plan.is_noop
146
147 result = execute_checkout_plan(
148 checkout_plan=plan, store=store, trace=trace,
149 )
150 assert result.is_noop
151 assert result.executed == 0
152 assert result.failed == 0
153
154
155 # ---------------------------------------------------------------------------
156 # 6.2 — Undo Execution
157 # ---------------------------------------------------------------------------
158
159
160 class TestUndoExecution:
161
162 def test_clear_and_add_notes_applied(self, store: StateStore, trace: TraceContext) -> None:
163
164 _setup_region(store, "r1", "t1")
165 txn = store.begin_transaction("add-working")
166 store.add_notes("r1", [_note(60, 0.0), _note(72, 2.0)], transaction=txn)
167 store.commit(txn)
168 assert len(store.get_region_notes("r1")) == 2
169
170 plan = build_checkout_plan(
171 project_id="p1", target_variation_id="parent-var",
172 target_notes={"r1": [_note(60, 0.0)]},
173 working_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
174 target_cc={}, working_cc={},
175 target_pb={}, working_pb={},
176 target_at={}, working_at={},
177 track_regions={"r1": "t1"},
178 )
179 assert not plan.is_noop
180 assert "r1" in plan.regions_reset
181
182 result = execute_checkout_plan(
183 checkout_plan=plan, store=store, trace=trace,
184 )
185 assert result.success
186 assert result.executed > 0
187
188 final_notes = store.get_region_notes("r1")
189 assert len(final_notes) == 1
190 assert final_notes[0]["pitch"] == 60
191
192 def test_sse_events_emitted(self, store: StateStore, trace: TraceContext) -> None:
193
194 _setup_region(store, "r1", "t1")
195 txn = store.begin_transaction("add")
196 store.add_notes("r1", [_note(60, 0.0), _note(72, 2.0)], transaction=txn)
197 store.commit(txn)
198
199 plan = build_checkout_plan(
200 project_id="p1", target_variation_id="v1",
201 target_notes={"r1": [_note(60, 0.0)]},
202 working_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
203 target_cc={}, working_cc={},
204 target_pb={}, working_pb={},
205 target_at={}, working_at={},
206 track_regions={"r1": "t1"},
207 )
208
209 result = execute_checkout_plan(
210 checkout_plan=plan, store=store, trace=trace, emit_sse=True,
211 )
212 assert len(result.events) > 0
213 tool_types = [e["tool"] for e in result.events]
214 assert ToolName.CLEAR_NOTES.value in tool_types
215 assert ToolName.ADD_NOTES.value in tool_types
216
217
218 # ---------------------------------------------------------------------------
219 # 6.3 — Redo Execution
220 # ---------------------------------------------------------------------------
221
222
223 class TestRedoExecution:
224
225 def test_redo_produces_same_plan_hash(self, store: StateStore, trace: TraceContext) -> None:
226
227 plan1 = build_checkout_plan(
228 project_id="p1", target_variation_id="v2",
229 target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
230 working_notes={"r1": [_note(60, 0.0)]},
231 target_cc={}, working_cc={},
232 target_pb={}, working_pb={},
233 target_at={}, working_at={},
234 track_regions={"r1": "t1"},
235 )
236 plan2 = build_checkout_plan(
237 project_id="p1", target_variation_id="v2",
238 target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
239 working_notes={"r1": [_note(60, 0.0)]},
240 target_cc={}, working_cc={},
241 target_pb={}, working_pb={},
242 target_at={}, working_at={},
243 track_regions={"r1": "t1"},
244 )
245 assert plan1.plan_hash() == plan2.plan_hash()
246
247 def test_redo_adds_missing_notes(self, store: StateStore, trace: TraceContext) -> None:
248
249 _setup_region(store, "r1", "t1")
250 txn = store.begin_transaction("initial")
251 store.add_notes("r1", [_note(60, 0.0)], transaction=txn)
252 store.commit(txn)
253
254 plan = build_checkout_plan(
255 project_id="p1", target_variation_id="v2",
256 target_notes={"r1": [_note(60, 0.0), _note(72, 2.0)]},
257 working_notes={"r1": [_note(60, 0.0)]},
258 target_cc={}, working_cc={},
259 target_pb={}, working_pb={},
260 target_at={}, working_at={},
261 track_regions={"r1": "t1"},
262 )
263
264 result = execute_checkout_plan(
265 checkout_plan=plan, store=store, trace=trace,
266 )
267 assert result.success
268 final = store.get_region_notes("r1")
269 assert len(final) == 2
270
271 def test_controller_restore(self, store: StateStore, trace: TraceContext) -> None:
272
273 _setup_region(store, "r1", "t1")
274 txn = store.begin_transaction("initial")
275 store.add_notes("r1", [_note(60, 0.0)], transaction=txn)
276 store.commit(txn)
277
278 plan = build_checkout_plan(
279 project_id="p1", target_variation_id="v2",
280 target_notes={"r1": [_note(60, 0.0)]},
281 working_notes={"r1": [_note(60, 0.0)]},
282 target_cc={"r1": [_cc(64, 0.0, 127)]},
283 working_cc={"r1": []},
284 target_pb={}, working_pb={},
285 target_at={}, working_at={},
286 track_regions={"r1": "t1"},
287 )
288 result = execute_checkout_plan(
289 checkout_plan=plan, store=store, trace=trace,
290 )
291 assert result.success
292 assert result.executed == 1
293
294
295 # ---------------------------------------------------------------------------
296 # 6.4 — Drift Block
297 # ---------------------------------------------------------------------------
298
299
300 class TestDriftBlock:
301
302 @pytest.mark.anyio
303 async def test_dirty_working_tree_blocks_checkout(self, async_session: AsyncSession) -> None:
304
305 store = StateStore(conversation_id="cb-test", project_id="proj-cb")
306 _setup_region(store, "region-1", "track-1")
307 trace = TraceContext(trace_id="test-drift-block")
308
309 var1 = _make_variation([_note(60, 0.0)])
310 await muse_repository.save_variation(
311 async_session, var1,
312 project_id="proj-cb", base_state_id="s1", conversation_id="c",
313 region_metadata={},
314 )
315 await muse_repository.set_head(async_session, var1.variation_id, commit_state_id="s1")
316 await async_session.commit()
317
318 txn = store.begin_transaction("user-edit")
319 store.add_notes("region-1", [_note(60, 0.0), _note(72, 2.0)], transaction=txn)
320 store.commit(txn)
321
322 with pytest.raises(CheckoutBlockedError) as exc_info:
323 await checkout_to_variation(
324 session=async_session,
325 project_id="proj-cb",
326 target_variation_id=var1.variation_id,
327 store=store,
328 trace=trace,
329 force=False,
330 )
331 assert "dirty" in str(exc_info.value).lower() or exc_info.value.total_changes > 0
332
333
334 # ---------------------------------------------------------------------------
335 # 6.5 — Force Override
336 # ---------------------------------------------------------------------------
337
338
339 class TestForceOverride:
340
341 @pytest.mark.anyio
342 async def test_force_bypasses_drift_check(self, async_session: AsyncSession) -> None:
343
344 store = StateStore(conversation_id="force-test", project_id="proj-force")
345 _setup_region(store, "region-1", "track-1")
346 trace = TraceContext(trace_id="test-force")
347
348 var1 = _make_variation([_note(60, 0.0)])
349 await muse_repository.save_variation(
350 async_session, var1,
351 project_id="proj-force", base_state_id="s1", conversation_id="c",
352 region_metadata={},
353 )
354 await muse_repository.set_head(async_session, var1.variation_id, commit_state_id="s1")
355 await async_session.commit()
356
357 txn = store.begin_transaction("user-edit")
358 store.add_notes("region-1", [_note(60, 0.0), _note(72, 2.0)], transaction=txn)
359 store.commit(txn)
360
361 summary = await checkout_to_variation(
362 session=async_session,
363 project_id="proj-force",
364 target_variation_id=var1.variation_id,
365 store=store,
366 trace=trace,
367 force=True,
368 )
369 assert summary.head_moved
370 assert summary.execution.failed == 0
371
372
373 # ---------------------------------------------------------------------------
374 # 6.6 — Boundary Seal
375 # ---------------------------------------------------------------------------
376
377
378 class TestCheckoutExecutorBoundary:
379
380 def test_no_handler_imports(self) -> None:
381
382 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout_executor.py"
383 tree = ast.parse(filepath.read_text())
384 forbidden = {"maestro_handlers", "maestro_editing", "maestro_composing"}
385 for node in ast.walk(tree):
386 if isinstance(node, ast.ImportFrom) and node.module:
387 for fb in forbidden:
388 assert fb not in node.module, (
389 f"muse_checkout_executor imports forbidden: {node.module}"
390 )
391
392 def test_no_variation_service_import(self) -> None:
393
394 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout_executor.py"
395 tree = ast.parse(filepath.read_text())
396 for node in ast.walk(tree):
397 if isinstance(node, (ast.Import, ast.ImportFrom)):
398 for alias in node.names:
399 assert alias.name != "VariationService", (
400 "muse_checkout_executor imports VariationService"
401 )
402
403 def test_no_replay_internals_import(self) -> None:
404
405 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_checkout_executor.py"
406 tree = ast.parse(filepath.read_text())
407 for node in ast.walk(tree):
408 if isinstance(node, ast.ImportFrom) and node.module:
409 assert "muse_replay" not in node.module, (
410 "muse_checkout_executor imports replay internals"
411 )