cgcardona / muse public
test_muse_merge.py python
588 lines 22.1 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for Muse Merge Engine (Phase 12).
2
3 Verifies:
4 - Merge base detection.
5 - Auto merge (non-overlapping regions).
6 - Conflict detection (same note modified on both sides).
7 - Merge checkout plan determinism.
8 - Merge commit graph (two parents).
9 - Boundary seal (AST).
10 - Dimension attribute behavior (rhythmic/structural reserved, no effect on merge).
11 """
12 from __future__ import annotations
13
14 import ast
15 import uuid
16 from pathlib import Path
17
18 import pytest
19 from maestro.contracts.json_types import (
20 AftertouchDict,
21 CCEventDict,
22 NoteDict,
23 PitchBendDict,
24 )
25 from collections.abc import AsyncGenerator
26 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
27
28 from maestro.db.database import Base
29 from maestro.db import muse_models # noqa: F401
30 from maestro.models.variation import (
31 MidiNoteSnapshot,
32 NoteChange,
33 Phrase,
34 Variation,
35 )
36 from maestro.services import muse_repository
37 from maestro.services.muse_attributes import (
38 MergeStrategy,
39 MuseAttribute,
40 parse_museattributes_file,
41 resolve_strategy,
42 )
43 from maestro.services.muse_merge import (
44 MergeConflict,
45 MergeResult,
46 ThreeWaySnapshot,
47 build_merge_result,
48 build_merge_checkout_plan,
49 )
50 from maestro.services.muse_merge_base import find_merge_base
51 from maestro.services.muse_replay import HeadSnapshot
52
53
54 # ── Fixtures ──────────────────────────────────────────────────────────────
55
56
57 @pytest.fixture
58 async def async_session() -> AsyncGenerator[AsyncSession, None]:
59 engine = create_async_engine("sqlite+aiosqlite:///:memory:")
60 async with engine.begin() as conn:
61 await conn.run_sync(Base.metadata.create_all)
62 Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
63 async with Session() as session:
64 yield session
65 await engine.dispose()
66
67
68 # ── Helpers ───────────────────────────────────────────────────────────────
69
70
71 def _note(pitch: int, start: float, dur: float = 1.0, vel: int = 100) -> NoteDict:
72
73 return {"pitch": pitch, "start_beat": start, "duration_beats": dur, "velocity": vel, "channel": 0}
74
75
76 def _cc(cc_num: int, beat: float, value: int) -> CCEventDict:
77
78 return {"cc": cc_num, "beat": beat, "value": value}
79
80
81 def _snap(
82 vid: str,
83 notes: dict[str, list[NoteDict]] | None = None,
84 cc: dict[str, list[CCEventDict]] | None = None,
85 pb: dict[str, list[PitchBendDict]] | None = None,
86 at: dict[str, list[AftertouchDict]] | None = None,
87 track_regions: dict[str, str] | None = None,
88 ) -> HeadSnapshot:
89 return HeadSnapshot(
90 variation_id=vid,
91 notes=notes or {},
92 cc=cc or {},
93 pitch_bends=pb or {},
94 aftertouch=at or {},
95 track_regions=track_regions or {},
96 region_start_beats={},
97 )
98
99
100 def _make_variation(
101 notes: list[NoteDict],
102 region_id: str = "region-1",
103 track_id: str = "track-1",
104 ) -> Variation:
105 vid = str(uuid.uuid4())
106 pid = str(uuid.uuid4())
107 return Variation(
108 variation_id=vid,
109 intent="test",
110 ai_explanation="test",
111 affected_tracks=[track_id],
112 affected_regions=[region_id],
113 beat_range=(0.0, 8.0),
114 phrases=[
115 Phrase(
116 phrase_id=pid,
117 track_id=track_id,
118 region_id=region_id,
119 start_beat=0.0,
120 end_beat=8.0,
121 label="Test",
122 note_changes=[
123 NoteChange(
124 note_id=str(uuid.uuid4()),
125 change_type="added",
126 after=MidiNoteSnapshot.from_note_dict(n),
127 )
128 for n in notes
129 ],
130 ),
131 ],
132 )
133
134
135 async def _save(
136 session: AsyncSession,
137 var: Variation,
138 project_id: str,
139 parent: str | None = None,
140 parent2: str | None = None,
141 ) -> str:
142 await muse_repository.save_variation(
143 session, var,
144 project_id=project_id,
145 base_state_id="s1",
146 conversation_id="c",
147 region_metadata={},
148 parent_variation_id=parent,
149 parent2_variation_id=parent2,
150 )
151 return var.variation_id
152
153
154 # ---------------------------------------------------------------------------
155 # 9.1 — Merge Base Detection
156 # ---------------------------------------------------------------------------
157
158
159 class TestMergeBase:
160
161 @pytest.mark.anyio
162 async def test_two_branches_find_common_root(self, async_session: AsyncSession) -> None:
163
164 """
165 root ─── left_branch
166 └───── right_branch
167 merge_base(left, right) = root
168 """
169 root = _make_variation([_note(60, 0.0)])
170 root_id = await _save(async_session, root, "proj-1")
171
172 left = _make_variation([_note(60, 0.0), _note(64, 2.0)])
173 left_id = await _save(async_session, left, "proj-1", parent=root_id)
174
175 right = _make_variation([_note(60, 0.0), _note(67, 4.0)])
176 right_id = await _save(async_session, right, "proj-1", parent=root_id)
177
178 await async_session.commit()
179
180 base = await find_merge_base(async_session, left_id, right_id)
181 assert base == root_id
182
183 @pytest.mark.anyio
184 async def test_deeper_branch_finds_correct_ancestor(self, async_session: AsyncSession) -> None:
185
186 """
187 A ── B ── left
188 └────── right
189 merge_base(left, right) = A
190 """
191 a = _make_variation([_note(60, 0.0)])
192 a_id = await _save(async_session, a, "proj-2")
193
194 b = _make_variation([_note(60, 0.0), _note(62, 1.0)])
195 b_id = await _save(async_session, b, "proj-2", parent=a_id)
196
197 left = _make_variation([_note(60, 0.0), _note(62, 1.0), _note(64, 2.0)])
198 left_id = await _save(async_session, left, "proj-2", parent=b_id)
199
200 right = _make_variation([_note(60, 0.0), _note(67, 4.0)])
201 right_id = await _save(async_session, right, "proj-2", parent=a_id)
202
203 await async_session.commit()
204
205 base = await find_merge_base(async_session, left_id, right_id)
206 assert base == a_id
207
208 @pytest.mark.anyio
209 async def test_no_common_ancestor_returns_none(self, async_session: AsyncSession) -> None:
210
211 a = _make_variation([_note(60, 0.0)])
212 a_id = await _save(async_session, a, "proj-3a")
213
214 b = _make_variation([_note(67, 0.0)])
215 b_id = await _save(async_session, b, "proj-3b")
216
217 await async_session.commit()
218
219 base = await find_merge_base(async_session, a_id, b_id)
220 assert base is None
221
222
223 # ---------------------------------------------------------------------------
224 # 9.2 — Auto Merge (Non-overlapping Regions)
225 # ---------------------------------------------------------------------------
226
227
228 class TestAutoMerge:
229
230 def test_disjoint_regions_merge_cleanly(self) -> None:
231
232 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
233 left = _snap("left", notes={"r1": [_note(60, 0.0)], "r2": [_note(72, 0.0)]})
234 right = _snap("right", notes={"r1": [_note(60, 0.0)], "r3": [_note(48, 0.0)]})
235
236 result = build_merge_result(base=base, left=left, right=right)
237 assert not result.has_conflicts
238 assert result.merged_snapshot is not None
239 assert "r1" in result.merged_snapshot.notes
240 assert "r2" in result.merged_snapshot.notes
241 assert "r3" in result.merged_snapshot.notes
242
243 def test_one_side_modifies_other_untouched(self) -> None:
244
245 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
246 left = _snap("left", notes={"r1": [_note(60, 0.0), _note(64, 2.0)]})
247 right = _snap("right", notes={"r1": [_note(60, 0.0)]})
248
249 result = build_merge_result(base=base, left=left, right=right)
250 assert not result.has_conflicts
251 merged = result.merged_snapshot
252 assert merged is not None
253 assert len(merged.notes["r1"]) == 2
254
255 def test_both_add_to_different_beats(self) -> None:
256
257 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
258 left = _snap("left", notes={"r1": [_note(60, 0.0), _note(64, 2.0)]})
259 right = _snap("right", notes={"r1": [_note(60, 0.0), _note(67, 4.0)]})
260
261 result = build_merge_result(base=base, left=left, right=right)
262 assert not result.has_conflicts
263 merged = result.merged_snapshot
264 assert merged is not None
265 assert len(merged.notes["r1"]) == 3
266
267 def test_controller_merge_from_one_side(self) -> None:
268
269 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
270 left = _snap("left",
271 notes={"r1": [_note(60, 0.0)]},
272 cc={"r1": [_cc(64, 0.0, 127)]})
273 right = _snap("right", notes={"r1": [_note(60, 0.0)]})
274
275 result = build_merge_result(base=base, left=left, right=right)
276 assert not result.has_conflicts
277 merged = result.merged_snapshot
278 assert merged is not None
279 assert len(merged.cc.get("r1", [])) == 1
280
281
282 # ---------------------------------------------------------------------------
283 # 9.3 — Conflict Detection
284 # ---------------------------------------------------------------------------
285
286
287 class TestConflictDetection:
288
289 def test_same_note_modified_both_sides(self) -> None:
290
291 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
292 left = _snap("left", notes={"r1": [_note(60, 0.0, vel=50)]})
293 right = _snap("right", notes={"r1": [_note(60, 0.0, vel=80)]})
294
295 result = build_merge_result(base=base, left=left, right=right)
296 assert result.has_conflicts
297 assert len(result.conflicts) >= 1
298 assert result.conflicts[0].type == "note"
299 assert result.merged_snapshot is None
300
301 def test_one_removed_other_modified(self) -> None:
302
303 base = _snap("base", notes={"r1": [_note(60, 0.0), _note(64, 2.0)]})
304 left = _snap("left", notes={"r1": [_note(64, 2.0)]})
305 right = _snap("right", notes={"r1": [_note(60, 0.0, vel=50), _note(64, 2.0)]})
306
307 result = build_merge_result(base=base, left=left, right=right)
308 assert result.has_conflicts
309 assert any("removed" in c.description.lower() or "modified" in c.description.lower()
310 for c in result.conflicts)
311
312 def test_controller_conflict(self) -> None:
313
314 base = _snap("base",
315 notes={"r1": [_note(60, 0.0)]},
316 cc={"r1": [_cc(64, 0.0, 100)]})
317 left = _snap("left",
318 notes={"r1": [_note(60, 0.0)]},
319 cc={"r1": [_cc(64, 0.0, 50)]})
320 right = _snap("right",
321 notes={"r1": [_note(60, 0.0)]},
322 cc={"r1": [_cc(64, 0.0, 80)]})
323
324 result = build_merge_result(base=base, left=left, right=right)
325 assert result.has_conflicts
326 assert any(c.type == "cc" for c in result.conflicts)
327
328 def test_no_conflicts_both_unchanged(self) -> None:
329
330 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
331 left = _snap("left", notes={"r1": [_note(60, 0.0)]})
332 right = _snap("right", notes={"r1": [_note(60, 0.0)]})
333
334 result = build_merge_result(base=base, left=left, right=right)
335 assert not result.has_conflicts
336 assert result.merged_snapshot is not None
337
338
339 # ---------------------------------------------------------------------------
340 # 9.4 — Merge CheckoutPlan Determinism
341 # ---------------------------------------------------------------------------
342
343
344 class TestMergeDeterminism:
345
346 def test_same_merge_produces_same_hash(self) -> None:
347
348 base = _snap("base", notes={"r1": [_note(60, 0.0)]})
349 left = _snap("left", notes={"r1": [_note(60, 0.0), _note(64, 2.0)]})
350 right = _snap("right", notes={"r1": [_note(60, 0.0)]})
351
352 r1 = build_merge_result(base=base, left=left, right=right)
353 r2 = build_merge_result(base=base, left=left, right=right)
354
355 assert r1.merged_snapshot is not None
356 assert r2.merged_snapshot is not None
357 assert r1.merged_snapshot.notes == r2.merged_snapshot.notes
358
359 @pytest.mark.anyio
360 async def test_merge_plan_deterministic(self, async_session: AsyncSession) -> None:
361
362 root = _make_variation([_note(60, 0.0)])
363 root_id = await _save(async_session, root, "proj-det")
364
365 left = _make_variation([_note(60, 0.0), _note(64, 2.0)])
366 left_id = await _save(async_session, left, "proj-det", parent=root_id)
367
368 right = _make_variation([_note(60, 0.0), _note(67, 4.0)])
369 right_id = await _save(async_session, right, "proj-det", parent=root_id)
370
371 await async_session.commit()
372
373 plan1 = await build_merge_checkout_plan(
374 async_session, "proj-det", left_id, right_id,
375 )
376 plan2 = await build_merge_checkout_plan(
377 async_session, "proj-det", left_id, right_id,
378 )
379
380 assert not plan1.is_conflict
381 assert not plan2.is_conflict
382 assert plan1.checkout_plan is not None
383 assert plan2.checkout_plan is not None
384 assert plan1.checkout_plan.plan_hash() == plan2.checkout_plan.plan_hash()
385
386
387 # ---------------------------------------------------------------------------
388 # 9.5 — Merge Commit Graph (Two Parents)
389 # ---------------------------------------------------------------------------
390
391
392 class TestMergeCommitGraph:
393
394 @pytest.mark.anyio
395 async def test_merge_commit_has_two_parents(self, async_session: AsyncSession) -> None:
396
397 root = _make_variation([_note(60, 0.0)])
398 root_id = await _save(async_session, root, "proj-graph")
399
400 left = _make_variation([_note(60, 0.0), _note(64, 2.0)])
401 left_id = await _save(async_session, left, "proj-graph", parent=root_id)
402
403 right = _make_variation([_note(60, 0.0), _note(67, 4.0)])
404 right_id = await _save(async_session, right, "proj-graph", parent=root_id)
405
406 merge = _make_variation([])
407 merge_id = await _save(
408 async_session, merge, "proj-graph",
409 parent=left_id, parent2=right_id,
410 )
411
412 await async_session.commit()
413
414 from sqlalchemy import select
415 from maestro.db import muse_models as db
416 stmt = select(db.Variation).where(db.Variation.variation_id == merge_id)
417 result = await async_session.execute(stmt)
418 row = result.scalar_one()
419
420 assert row.parent_variation_id == left_id
421 assert row.parent2_variation_id == right_id
422
423 @pytest.mark.anyio
424 async def test_parent2_nullable_for_non_merge(self, async_session: AsyncSession) -> None:
425
426 var = _make_variation([_note(60, 0.0)])
427 vid = await _save(async_session, var, "proj-null")
428 await async_session.commit()
429
430 from sqlalchemy import select
431 from maestro.db import muse_models as db
432 stmt = select(db.Variation).where(db.Variation.variation_id == vid)
433 result = await async_session.execute(stmt)
434 row = result.scalar_one()
435
436 assert row.parent2_variation_id is None
437
438
439 # ---------------------------------------------------------------------------
440 # 9.6 — Boundary Seal
441 # ---------------------------------------------------------------------------
442
443
444 class TestMergeBoundary:
445
446 def test_merge_no_state_store_import(self) -> None:
447
448 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_merge.py"
449 tree = ast.parse(filepath.read_text())
450 forbidden = {"state_store", "executor", "maestro_handlers", "maestro_editing", "mcp"}
451 for node in ast.walk(tree):
452 if isinstance(node, ast.ImportFrom) and node.module:
453 for fb in forbidden:
454 assert fb not in node.module, (
455 f"muse_merge imports forbidden module: {node.module}"
456 )
457
458 def test_merge_base_no_state_store_import(self) -> None:
459
460 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_merge_base.py"
461 tree = ast.parse(filepath.read_text())
462 forbidden = {"state_store", "executor", "maestro_handlers", "mcp"}
463 for node in ast.walk(tree):
464 if isinstance(node, ast.ImportFrom) and node.module:
465 for fb in forbidden:
466 assert fb not in node.module, (
467 f"muse_merge_base imports forbidden module: {node.module}"
468 )
469
470 def test_merge_no_forbidden_names(self) -> None:
471
472 filepath = Path(__file__).resolve().parent.parent / "maestro" / "services" / "muse_merge.py"
473 tree = ast.parse(filepath.read_text())
474 forbidden_names = {"StateStore", "get_or_create_store", "VariationService"}
475 for node in ast.walk(tree):
476 if isinstance(node, (ast.Import, ast.ImportFrom)):
477 for alias in node.names:
478 assert alias.name not in forbidden_names, (
479 f"muse_merge imports forbidden name: {alias.name}"
480 )
481
482
483 # ---------------------------------------------------------------------------
484 # 9.7 — Dimension Attribute Behavior (rhythmic / structural reserved)
485 # ---------------------------------------------------------------------------
486
487
488 class TestMuseAttributeDimensions:
489 """Verify that rhythmic and structural dimensions are parsed correctly
490 but have no current effect on merge outcome (reserved for future wiring).
491
492 These tests document the intentional gap described: all five
493 dimension names are valid in .museattributes, but build_merge_result does
494 not yet consult resolve_strategy for any event type. When dimension wiring
495 is eventually implemented, these tests should be updated to reflect the new
496 expected behavior.
497 """
498
499 def test_rhythmic_dimension_parses_correctly(self) -> None:
500 """'rhythmic' is a valid dimension name — parsing must not reject it."""
501 content = "drums/* rhythmic ours\n"
502 attrs = parse_museattributes_file(content)
503 assert len(attrs) == 1
504 assert attrs[0].dimension == "rhythmic"
505 assert attrs[0].strategy == MergeStrategy.OURS
506
507 def test_structural_dimension_parses_correctly(self) -> None:
508 """'structural' is a valid dimension name — parsing must not reject it."""
509 content = "* structural manual\n"
510 attrs = parse_museattributes_file(content)
511 assert len(attrs) == 1
512 assert attrs[0].dimension == "structural"
513 assert attrs[0].strategy == MergeStrategy.MANUAL
514
515 def test_resolve_strategy_returns_ours_for_rhythmic(self) -> None:
516 """resolve_strategy correctly resolves 'rhythmic' when a matching rule exists."""
517 attrs = parse_museattributes_file("drums/* rhythmic ours\n")
518 result = resolve_strategy(attrs, "drums/kick", "rhythmic")
519 assert result == MergeStrategy.OURS
520
521 def test_resolve_strategy_returns_auto_for_unmatched_rhythmic(self) -> None:
522 """resolve_strategy falls back to AUTO when no rhythmic rule matches."""
523 attrs = parse_museattributes_file("keys/* harmonic theirs\n")
524 result = resolve_strategy(attrs, "drums/kick", "rhythmic")
525 assert result == MergeStrategy.AUTO
526
527 def test_rhythmic_ours_has_no_effect_on_note_merge(self) -> None:
528 """Regression: 'drums/* rhythmic ours' must not silently corrupt merge.
529
530 build_merge_result does not yet accept attributes, so dimension-based
531 strategy resolution is not applied. A note conflict on a region that
532 would match 'drums/* rhythmic ours' is still reported as a conflict.
533 This is the expected behaviour until the merge engine is wired up to
534 consult resolve_strategy per event type.
535 """
536 # Two branches both modify the same note — this should be a conflict.
537 base = _snap("base", notes={"drums-region-1": [_note(36, 0.0)]})
538 left = _snap("left", notes={"drums-region-1": [_note(36, 0.0, vel=50)]})
539 right = _snap("right", notes={"drums-region-1": [_note(36, 0.0, vel=80)]})
540
541 # Even if the user has "drums/* rhythmic ours" in .museattributes,
542 # build_merge_result currently ignores attributes — the conflict is reported.
543 result = build_merge_result(base=base, left=left, right=right)
544
545 assert result.has_conflicts, (
546 "Expected a note conflict regardless of .museattributes rhythmic rule "
547 "(dimension wiring not yet implemented — see )"
548 )
549 assert any(c.type == "note" for c in result.conflicts)
550 assert result.merged_snapshot is None
551
552 def test_structural_ours_has_no_effect_on_note_merge(self) -> None:
553 """Regression: 'structural ours' must not silently corrupt merge.
554
555 Same rationale as test_rhythmic_ours_has_no_effect_on_note_merge:
556 structural dimension is reserved and has no current merge-engine effect.
557 """
558 base = _snap("base", notes={"section-region": [_note(60, 0.0)]})
559 left = _snap("left", notes={"section-region": [_note(60, 0.0, vel=40)]})
560 right = _snap("right", notes={"section-region": [_note(60, 0.0, vel=90)]})
561
562 result = build_merge_result(base=base, left=left, right=right)
563
564 assert result.has_conflicts, (
565 "Expected a note conflict regardless of .museattributes structural rule "
566 "(dimension wiring not yet implemented — see )"
567 )
568 assert result.merged_snapshot is None
569
570 def test_all_five_dimensions_parse_without_error(self) -> None:
571 """All five dimension names are accepted by the parser without warnings."""
572 content = (
573 "drums/* rhythmic ours\n"
574 "keys/* harmonic theirs\n"
575 "* melodic auto\n"
576 "* dynamic union\n"
577 "* structural manual\n"
578 )
579 attrs = parse_museattributes_file(content)
580 assert len(attrs) == 5
581 dimensions = {a.dimension for a in attrs}
582 assert dimensions == {"rhythmic", "harmonic", "melodic", "dynamic", "structural"}
583
584 def test_wildcard_dimension_matches_rhythmic_and_structural(self) -> None:
585 """A '*' dimension rule resolves for both 'rhythmic' and 'structural'."""
586 attrs = parse_museattributes_file("* * ours\n")
587 assert resolve_strategy(attrs, "drums/kick", "rhythmic") == MergeStrategy.OURS
588 assert resolve_strategy(attrs, "any_track", "structural") == MergeStrategy.OURS