cgcardona / muse public
test_commit.py python
908 lines 28.5 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Integration tests for ``muse commit``.
2
3 Tests exercise ``_commit_async`` directly with an in-memory SQLite session
4 so no real Postgres instance is required. The ``muse_cli_db_session``
5 fixture (defined in tests/muse_cli/conftest.py) provides the isolated
6 SQLite session.
7
8 All async tests use ``@pytest.mark.anyio`` (configured for asyncio mode
9 in pyproject.toml).
10 """
11 from __future__ import annotations
12
13 import json
14 import pathlib
15 import uuid
16
17 import pytest
18 import pytest_asyncio
19 from sqlalchemy.ext.asyncio import AsyncSession
20 from sqlalchemy.future import select
21
22 from maestro.muse_cli.commands.commit import (
23 _append_co_author,
24 _apply_commit_music_metadata,
25 _commit_async,
26 build_snapshot_manifest_from_batch,
27 load_muse_batch,
28 )
29 from maestro.muse_cli.errors import ExitCode
30 from maestro.muse_cli.models import MuseCliCommit, MuseCliObject, MuseCliSnapshot
31 from maestro.muse_cli.snapshot import (
32 build_snapshot_manifest,
33 compute_snapshot_id,
34 )
35
36
37 # ---------------------------------------------------------------------------
38 # Helpers
39 # ---------------------------------------------------------------------------
40
41
42 def _init_muse_repo(root: pathlib.Path, repo_id: str | None = None) -> str:
43 """Create a minimal .muse/ layout so _commit_async can read repo state."""
44 rid = repo_id or str(uuid.uuid4())
45 muse = root / ".muse"
46 (muse / "refs" / "heads").mkdir(parents=True)
47 (muse / "repo.json").write_text(
48 json.dumps({"repo_id": rid, "schema_version": "1"})
49 )
50 (muse / "HEAD").write_text("refs/heads/main")
51 (muse / "refs" / "heads" / "main").write_text("") # no commits yet
52 return rid
53
54
55 def _populate_workdir(root: pathlib.Path, files: dict[str, bytes] | None = None) -> None:
56 """Create muse-work/ with one or more files."""
57 workdir = root / "muse-work"
58 workdir.mkdir(exist_ok=True)
59 if files is None:
60 files = {"beat.mid": b"MIDI-DATA", "lead.mp3": b"MP3-DATA"}
61 for name, content in files.items():
62 (workdir / name).write_bytes(content)
63
64
65 # ---------------------------------------------------------------------------
66 # Basic commit creation
67 # ---------------------------------------------------------------------------
68
69
70 @pytest.mark.anyio
71 async def test_commit_creates_postgres_row(
72 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
73 ) -> None:
74 repo_id = _init_muse_repo(tmp_path)
75 _populate_workdir(tmp_path)
76
77 commit_id = await _commit_async(
78 message="boom bap demo take 1",
79 root=tmp_path,
80 session=muse_cli_db_session,
81 )
82
83 result = await muse_cli_db_session.execute(
84 select(MuseCliCommit).where(MuseCliCommit.commit_id == commit_id)
85 )
86 row = result.scalar_one_or_none()
87 assert row is not None, "commit row must exist after _commit_async"
88 assert row.message == "boom bap demo take 1"
89 assert row.repo_id == repo_id
90 assert row.branch == "main"
91
92
93 @pytest.mark.anyio
94 async def test_commit_id_is_deterministic(
95 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
96 ) -> None:
97 """commit_id is a 64-char sha256 hex string stored exactly once in DB.
98
99 Pure determinism of ``compute_commit_id`` is covered by
100 ``test_snapshot.py::test_commit_id_parametrized_deterministic``.
101 Here we verify the integration contract: _commit_async returns a
102 valid object ID and the row is findable by that ID.
103 """
104 _init_muse_repo(tmp_path)
105 _populate_workdir(tmp_path, {"track.mid": b"CONSISTENT"})
106
107 commit_id = await _commit_async(
108 message="determinism check",
109 root=tmp_path,
110 session=muse_cli_db_session,
111 )
112
113 # Valid sha256 hex digest
114 assert len(commit_id) == 64
115 assert all(c in "0123456789abcdef" for c in commit_id)
116
117 # Stored in DB and findable by its own ID (no duplication)
118 result = await muse_cli_db_session.execute(
119 select(MuseCliCommit).where(MuseCliCommit.commit_id == commit_id)
120 )
121 rows = result.scalars().all()
122 assert len(rows) == 1
123 assert rows[0].message == "determinism check"
124
125
126 @pytest.mark.anyio
127 async def test_commit_snapshot_content_addressed_same_files_same_id(
128 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
129 ) -> None:
130 """Same files → same snapshot_id on two successive commits."""
131 _init_muse_repo(tmp_path)
132 _populate_workdir(tmp_path, {"a.mid": b"CONSTANT"})
133
134 cid1 = await _commit_async(
135 message="first", root=tmp_path, session=muse_cli_db_session
136 )
137
138 result = await muse_cli_db_session.execute(
139 select(MuseCliCommit).where(MuseCliCommit.commit_id == cid1)
140 )
141 snap_id_1 = result.scalar_one().snapshot_id
142
143 # Manually compute expected snapshot_id from the on-disk files
144 manifest = build_snapshot_manifest(tmp_path / "muse-work")
145 assert compute_snapshot_id(manifest) == snap_id_1
146
147
148 @pytest.mark.anyio
149 async def test_commit_snapshot_content_addressed_changed_file_new_id(
150 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
151 ) -> None:
152 """Changing a file produces a different snapshot_id."""
153 _init_muse_repo(tmp_path)
154 _populate_workdir(tmp_path, {"a.mid": b"VERSION1"})
155
156 cid1 = await _commit_async(
157 message="v1", root=tmp_path, session=muse_cli_db_session
158 )
159 r1 = await muse_cli_db_session.execute(
160 select(MuseCliCommit).where(MuseCliCommit.commit_id == cid1)
161 )
162 snap1 = r1.scalar_one().snapshot_id
163
164 # Change file content and commit again
165 (tmp_path / "muse-work" / "a.mid").write_bytes(b"VERSION2")
166 cid2 = await _commit_async(
167 message="v2", root=tmp_path, session=muse_cli_db_session
168 )
169 r2 = await muse_cli_db_session.execute(
170 select(MuseCliCommit).where(MuseCliCommit.commit_id == cid2)
171 )
172 snap2 = r2.scalar_one().snapshot_id
173
174 assert snap1 != snap2
175
176
177 @pytest.mark.anyio
178 async def test_commit_moves_branch_head(
179 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
180 ) -> None:
181 """After commit, .muse/refs/heads/main contains the new commit_id."""
182 _init_muse_repo(tmp_path)
183 _populate_workdir(tmp_path)
184
185 commit_id = await _commit_async(
186 message="update head", root=tmp_path, session=muse_cli_db_session
187 )
188
189 ref_content = (tmp_path / ".muse" / "refs" / "heads" / "main").read_text().strip()
190 assert ref_content == commit_id
191
192
193 @pytest.mark.anyio
194 async def test_commit_sets_parent_pointer(
195 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
196 ) -> None:
197 """Second commit's parent_commit_id equals the first commit_id."""
198 _init_muse_repo(tmp_path)
199 _populate_workdir(tmp_path, {"beat.mid": b"V1"})
200
201 cid1 = await _commit_async(
202 message="first", root=tmp_path, session=muse_cli_db_session
203 )
204
205 # Change content so it's not a "nothing to commit" situation
206 (tmp_path / "muse-work" / "beat.mid").write_bytes(b"V2")
207 cid2 = await _commit_async(
208 message="second", root=tmp_path, session=muse_cli_db_session
209 )
210
211 r2 = await muse_cli_db_session.execute(
212 select(MuseCliCommit).where(MuseCliCommit.commit_id == cid2)
213 )
214 row2 = r2.scalar_one()
215 assert row2.parent_commit_id == cid1
216
217
218 @pytest.mark.anyio
219 async def test_commit_objects_are_deduplicated(
220 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
221 ) -> None:
222 """The same file committed twice → exactly one object row in DB."""
223 _init_muse_repo(tmp_path)
224 _populate_workdir(tmp_path, {"beat.mid": b"SHARED"})
225
226 await _commit_async(message="c1", root=tmp_path, session=muse_cli_db_session)
227
228 # Second workdir with same file content but different name → same object_id
229 (tmp_path / "muse-work" / "copy.mid").write_bytes(b"SHARED")
230 await _commit_async(message="c2", root=tmp_path, session=muse_cli_db_session)
231
232 result = await muse_cli_db_session.execute(select(MuseCliObject))
233 all_objects = result.scalars().all()
234 object_ids = {o.object_id for o in all_objects}
235 # Both files have identical bytes → same object_id → only 1 row for that content
236 import hashlib
237 shared_oid = hashlib.sha256(b"SHARED").hexdigest()
238 assert shared_oid in object_ids
239 # Ensure no duplicate rows for shared_oid
240 shared_rows = [o for o in all_objects if o.object_id == shared_oid]
241 assert len(shared_rows) == 1
242
243
244 # ---------------------------------------------------------------------------
245 # Nothing to commit
246 # ---------------------------------------------------------------------------
247
248
249 @pytest.mark.anyio
250 async def test_commit_nothing_to_commit_exits_zero(
251 tmp_path: pathlib.Path,
252 muse_cli_db_session: AsyncSession,
253 capsys: pytest.CaptureFixture[str],
254 ) -> None:
255 """Committing the same working tree twice exits 0 with the clean-tree message."""
256 import typer
257
258 _init_muse_repo(tmp_path)
259 _populate_workdir(tmp_path)
260
261 await _commit_async(
262 message="initial", root=tmp_path, session=muse_cli_db_session
263 )
264
265 # Second commit with unchanged tree should exit 0
266 with pytest.raises(typer.Exit) as exc_info:
267 await _commit_async(
268 message="nothing changed", root=tmp_path, session=muse_cli_db_session
269 )
270
271 assert exc_info.value.exit_code == ExitCode.SUCCESS
272
273 captured = capsys.readouterr()
274 assert "Nothing to commit" in captured.out
275
276
277 # ---------------------------------------------------------------------------
278 # Error cases
279 # ---------------------------------------------------------------------------
280
281
282 @pytest.mark.anyio
283 async def test_commit_outside_repo_exits_2(
284 tmp_path: pathlib.Path,
285 muse_cli_db_session: AsyncSession,
286 ) -> None:
287 """_commit_async never calls require_repo — that's the Typer callback's job.
288 This test uses the Typer CLI runner to verify exit code 2 when there is
289 no .muse/ directory.
290 """
291 from typer.testing import CliRunner
292 from maestro.muse_cli.app import cli
293
294 runner = CliRunner()
295 result = runner.invoke(cli, ["commit", "-m", "no repo"], catch_exceptions=False)
296 assert result.exit_code == ExitCode.REPO_NOT_FOUND
297
298
299 @pytest.mark.anyio
300 async def test_commit_no_workdir_exits_1(
301 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
302 ) -> None:
303 """When muse-work/ does not exist, commit exits with USER_ERROR (1)."""
304 import typer
305
306 _init_muse_repo(tmp_path)
307 # Deliberately do NOT create muse-work/
308
309 with pytest.raises(typer.Exit) as exc_info:
310 await _commit_async(
311 message="no workdir", root=tmp_path, session=muse_cli_db_session
312 )
313 assert exc_info.value.exit_code == ExitCode.USER_ERROR
314
315
316 @pytest.mark.anyio
317 async def test_commit_empty_workdir_exits_1(
318 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
319 ) -> None:
320 """When muse-work/ exists but is empty, commit exits with USER_ERROR (1)."""
321 import typer
322
323 _init_muse_repo(tmp_path)
324 (tmp_path / "muse-work").mkdir() # empty directory
325
326 with pytest.raises(typer.Exit) as exc_info:
327 await _commit_async(
328 message="empty", root=tmp_path, session=muse_cli_db_session
329 )
330 assert exc_info.value.exit_code == ExitCode.USER_ERROR
331
332
333 # ---------------------------------------------------------------------------
334 # --from-batch fast path
335 # ---------------------------------------------------------------------------
336
337
338 def _write_muse_batch(
339 batch_root: pathlib.Path,
340 files: list[dict[str, object]],
341 run_id: str = "stress-test",
342 suggestion: str = "feat: jazz stress test",
343 ) -> pathlib.Path:
344 """Write a minimal muse-batch.json fixture and return its path."""
345 data = {
346 "run_id": run_id,
347 "generated_at": "2026-02-27T17:29:19Z",
348 "commit_message_suggestion": suggestion,
349 "files": files,
350 "provenance": {"prompt": "test", "model": "storpheus", "seed": run_id, "storpheus_version": "1.0"},
351 }
352 batch_path = batch_root / "muse-batch.json"
353 batch_path.write_text(json.dumps(data, indent=2))
354 return batch_path
355
356
357 @pytest.mark.anyio
358 async def test_commit_from_batch_uses_suggestion(
359 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
360 ) -> None:
361 """--from-batch uses commit_message_suggestion as the commit message."""
362 _init_muse_repo(tmp_path)
363 workdir = tmp_path / "muse-work" / "tracks" / "drums"
364 workdir.mkdir(parents=True)
365 mid_file = workdir / "jazz_4b_comp-0001.mid"
366 mid_file.write_bytes(b"MIDI-DATA")
367
368 batch_path = _write_muse_batch(
369 tmp_path,
370 files=[{
371 "path": "muse-work/tracks/drums/jazz_4b_comp-0001.mid",
372 "role": "midi",
373 "genre": "jazz",
374 "bars": 4,
375 "cached": False,
376 }],
377 suggestion="feat: jazz stress test",
378 )
379
380 commit_id = await _commit_async(
381 message="", # overridden by batch suggestion
382 root=tmp_path,
383 session=muse_cli_db_session,
384 batch_path=batch_path,
385 )
386
387 result = await muse_cli_db_session.execute(
388 select(MuseCliCommit).where(MuseCliCommit.commit_id == commit_id)
389 )
390 row = result.scalar_one()
391 assert row.message == "feat: jazz stress test"
392
393
394 @pytest.mark.anyio
395 async def test_commit_from_batch_snapshots_listed_files_only(
396 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
397 ) -> None:
398 """--from-batch snapshots only files listed in files[], not the entire muse-work/."""
399 _init_muse_repo(tmp_path)
400
401 # Create two files in muse-work/
402 listed_dir = tmp_path / "muse-work" / "tracks" / "drums"
403 listed_dir.mkdir(parents=True)
404 listed_file = listed_dir / "jazz_4b_comp-0001.mid"
405 listed_file.write_bytes(b"LISTED-MIDI")
406
407 unlisted_dir = tmp_path / "muse-work" / "renders"
408 unlisted_dir.mkdir(parents=True)
409 unlisted_file = unlisted_dir / "house_8b_comp-9999.mp3"
410 unlisted_file.write_bytes(b"UNLISTED-MP3")
411
412 # Batch only references the MIDI file
413 batch_path = _write_muse_batch(
414 tmp_path,
415 files=[{
416 "path": "muse-work/tracks/drums/jazz_4b_comp-0001.mid",
417 "role": "midi",
418 "genre": "jazz",
419 "bars": 4,
420 "cached": False,
421 }],
422 suggestion="feat: partial batch commit",
423 )
424
425 commit_id = await _commit_async(
426 message="",
427 root=tmp_path,
428 session=muse_cli_db_session,
429 batch_path=batch_path,
430 )
431
432 # Retrieve the snapshot manifest from DB
433 from maestro.muse_cli.db import get_head_snapshot_id
434 from maestro.muse_cli.models import MuseCliSnapshot
435 from sqlalchemy.future import select as sa_select
436
437 row = await muse_cli_db_session.execute(
438 select(MuseCliCommit).where(MuseCliCommit.commit_id == commit_id)
439 )
440 commit_row = row.scalar_one()
441 snap_row = await muse_cli_db_session.execute(
442 sa_select(MuseCliSnapshot).where(
443 MuseCliSnapshot.snapshot_id == commit_row.snapshot_id
444 )
445 )
446 snapshot = snap_row.scalar_one()
447 manifest: dict[str, str] = snapshot.manifest
448
449 # Only the listed MIDI file should be in the snapshot
450 assert any("jazz_4b_comp-0001.mid" in k for k in manifest.keys())
451 assert not any("house_8b_comp-9999.mp3" in k for k in manifest.keys()), (
452 "Unlisted files must NOT appear in the --from-batch snapshot"
453 )
454
455
456 @pytest.mark.anyio
457 async def test_commit_from_batch_missing_batch_file_exits_1(
458 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
459 ) -> None:
460 """When muse-batch.json does not exist, commit exits USER_ERROR."""
461 import typer
462
463 _init_muse_repo(tmp_path)
464 nonexistent = tmp_path / "muse-batch.json"
465
466 with pytest.raises(typer.Exit) as exc_info:
467 await _commit_async(
468 message="",
469 root=tmp_path,
470 session=muse_cli_db_session,
471 batch_path=nonexistent,
472 )
473 assert exc_info.value.exit_code == ExitCode.USER_ERROR
474
475
476 @pytest.mark.anyio
477 async def test_commit_from_batch_all_files_missing_exits_1(
478 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
479 ) -> None:
480 """When no listed files exist on disk, commit exits USER_ERROR."""
481 import typer
482
483 _init_muse_repo(tmp_path)
484 batch_path = _write_muse_batch(
485 tmp_path,
486 files=[{
487 "path": "muse-work/tracks/drums/nonexistent.mid",
488 "role": "midi",
489 "genre": "jazz",
490 "bars": 4,
491 "cached": False,
492 }],
493 )
494
495 with pytest.raises(typer.Exit) as exc_info:
496 await _commit_async(
497 message="",
498 root=tmp_path,
499 session=muse_cli_db_session,
500 batch_path=batch_path,
501 )
502 assert exc_info.value.exit_code == ExitCode.USER_ERROR
503
504
505 def test_load_muse_batch_invalid_json_exits_1(tmp_path: pathlib.Path) -> None:
506 """load_muse_batch raises typer.Exit USER_ERROR on malformed JSON."""
507 import typer
508
509 bad_json = tmp_path / "muse-batch.json"
510 bad_json.write_text("{ not valid json }")
511
512 with pytest.raises(typer.Exit) as exc_info:
513 load_muse_batch(bad_json)
514 assert exc_info.value.exit_code == ExitCode.USER_ERROR
515
516
517 def test_build_snapshot_manifest_from_batch_skips_missing_files(
518 tmp_path: pathlib.Path,
519 ) -> None:
520 """build_snapshot_manifest_from_batch silently skips files not on disk."""
521 workdir = tmp_path / "muse-work" / "tracks"
522 workdir.mkdir(parents=True)
523 existing = workdir / "jazz_4b.mid"
524 existing.write_bytes(b"MIDI")
525
526 batch_data: dict[str, object] = {
527 "files": [
528 {"path": "muse-work/tracks/jazz_4b.mid", "role": "midi"},
529 {"path": "muse-work/tracks/missing.mid", "role": "midi"},
530 ]
531 }
532
533 manifest = build_snapshot_manifest_from_batch(batch_data, tmp_path)
534 assert "tracks/jazz_4b.mid" in manifest
535 assert "tracks/missing.mid" not in manifest
536
537
538 # ---------------------------------------------------------------------------
539 # --section / --track / --emotion music-domain metadata
540 # ---------------------------------------------------------------------------
541
542
543 @pytest.mark.anyio
544 async def test_commit_section_stored_in_metadata(
545 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
546 ) -> None:
547 """--section value is stored in commit_metadata['section']."""
548 _init_muse_repo(tmp_path)
549 _populate_workdir(tmp_path)
550
551 commit_id = await _commit_async(
552 message="chorus bass take",
553 root=tmp_path,
554 session=muse_cli_db_session,
555 section="chorus",
556 )
557
558 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
559 assert row is not None
560 assert row.commit_metadata is not None
561 assert row.commit_metadata.get("section") == "chorus"
562
563
564 @pytest.mark.anyio
565 async def test_commit_track_stored_in_metadata(
566 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
567 ) -> None:
568 """--track value is stored in commit_metadata['track']."""
569 _init_muse_repo(tmp_path)
570 _populate_workdir(tmp_path)
571
572 commit_id = await _commit_async(
573 message="bass groove",
574 root=tmp_path,
575 session=muse_cli_db_session,
576 track="bass",
577 )
578
579 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
580 assert row is not None
581 assert row.commit_metadata is not None
582 assert row.commit_metadata.get("track") == "bass"
583
584
585 @pytest.mark.anyio
586 async def test_commit_emotion_stored_in_metadata(
587 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
588 ) -> None:
589 """--emotion value is stored in commit_metadata['emotion']."""
590 _init_muse_repo(tmp_path)
591 _populate_workdir(tmp_path)
592
593 commit_id = await _commit_async(
594 message="sad piano take",
595 root=tmp_path,
596 session=muse_cli_db_session,
597 emotion="melancholic",
598 )
599
600 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
601 assert row is not None
602 assert row.commit_metadata is not None
603 assert row.commit_metadata.get("emotion") == "melancholic"
604
605
606 @pytest.mark.anyio
607 async def test_commit_all_music_metadata_flags_together(
608 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
609 ) -> None:
610 """--section + --track + --emotion all land in commit_metadata simultaneously."""
611 _init_muse_repo(tmp_path)
612 _populate_workdir(tmp_path)
613
614 commit_id = await _commit_async(
615 message="full take",
616 root=tmp_path,
617 session=muse_cli_db_session,
618 section="verse",
619 track="keys",
620 emotion="joyful",
621 )
622
623 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
624 assert row is not None
625 meta = row.commit_metadata
626 assert meta is not None
627 assert meta.get("section") == "verse"
628 assert meta.get("track") == "keys"
629 assert meta.get("emotion") == "joyful"
630
631
632 @pytest.mark.anyio
633 async def test_commit_no_music_flags_metadata_is_none(
634 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
635 ) -> None:
636 """When no music flags are provided, commit_metadata is None (not an empty dict)."""
637 _init_muse_repo(tmp_path)
638 _populate_workdir(tmp_path)
639
640 commit_id = await _commit_async(
641 message="plain commit",
642 root=tmp_path,
643 session=muse_cli_db_session,
644 )
645
646 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
647 assert row is not None
648 assert row.commit_metadata is None
649
650
651 # ---------------------------------------------------------------------------
652 # --co-author trailer
653 # ---------------------------------------------------------------------------
654
655
656 def test_append_co_author_adds_trailer() -> None:
657 """_append_co_author appends a Co-authored-by trailer separated by a blank line."""
658 result = _append_co_author("Initial commit", "Alice <alice@stori.app>")
659 assert result == "Initial commit\n\nCo-authored-by: Alice <alice@stori.app>"
660
661
662 def test_append_co_author_empty_message() -> None:
663 """_append_co_author with an empty base message produces just the trailer."""
664 result = _append_co_author("", "Bob <bob@stori.app>")
665 assert result == "Co-authored-by: Bob <bob@stori.app>"
666
667
668 @pytest.mark.anyio
669 async def test_commit_co_author_appended_to_message(
670 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
671 ) -> None:
672 """--co-author appends a Co-authored-by trailer to the stored commit message."""
673 _init_muse_repo(tmp_path)
674 _populate_workdir(tmp_path)
675
676 commit_id = await _commit_async(
677 message="collab jam session",
678 root=tmp_path,
679 session=muse_cli_db_session,
680 co_author="Alice <alice@stori.app>",
681 )
682
683 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
684 assert row is not None
685 assert "Co-authored-by: Alice <alice@stori.app>" in row.message
686 assert row.message.startswith("collab jam session")
687
688
689 # ---------------------------------------------------------------------------
690 # --allow-empty
691 # ---------------------------------------------------------------------------
692
693
694 @pytest.mark.anyio
695 async def test_commit_allow_empty_bypasses_clean_tree_guard(
696 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
697 ) -> None:
698 """--allow-empty allows committing the same snapshot twice."""
699 _init_muse_repo(tmp_path)
700 _populate_workdir(tmp_path)
701
702 await _commit_async(
703 message="first commit",
704 root=tmp_path,
705 session=muse_cli_db_session,
706 )
707
708 # Second commit with identical tree — would normally exit with "Nothing to commit"
709 commit_id2 = await _commit_async(
710 message="milestone marker",
711 root=tmp_path,
712 session=muse_cli_db_session,
713 allow_empty=True,
714 )
715
716 row = await muse_cli_db_session.get(MuseCliCommit, commit_id2)
717 assert row is not None
718 assert row.message == "milestone marker"
719
720
721 @pytest.mark.anyio
722 async def test_commit_allow_empty_with_emotion_metadata(
723 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
724 ) -> None:
725 """--allow-empty + --emotion enables metadata-only milestone commits."""
726 _init_muse_repo(tmp_path)
727 _populate_workdir(tmp_path)
728
729 await _commit_async(
730 message="initial session",
731 root=tmp_path,
732 session=muse_cli_db_session,
733 )
734
735 commit_id = await _commit_async(
736 message="emotional annotation",
737 root=tmp_path,
738 session=muse_cli_db_session,
739 allow_empty=True,
740 emotion="tense",
741 )
742
743 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
744 assert row is not None
745 assert row.commit_metadata is not None
746 assert row.commit_metadata.get("emotion") == "tense"
747
748
749 @pytest.mark.anyio
750 async def test_commit_without_allow_empty_still_exits_on_clean_tree(
751 tmp_path: pathlib.Path,
752 muse_cli_db_session: AsyncSession,
753 ) -> None:
754 """Without --allow-empty the nothing-to-commit guard still fires."""
755 import typer
756
757 _init_muse_repo(tmp_path)
758 _populate_workdir(tmp_path)
759
760 await _commit_async(
761 message="initial", root=tmp_path, session=muse_cli_db_session
762 )
763
764 with pytest.raises(typer.Exit) as exc_info:
765 await _commit_async(
766 message="duplicate",
767 root=tmp_path,
768 session=muse_cli_db_session,
769 allow_empty=False,
770 )
771 assert exc_info.value.exit_code == ExitCode.SUCCESS
772
773
774 # ---------------------------------------------------------------------------
775 # _apply_commit_music_metadata helper
776 # ---------------------------------------------------------------------------
777
778
779 @pytest.mark.anyio
780 async def test_apply_commit_music_metadata_updates_existing_commit(
781 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
782 ) -> None:
783 """_apply_commit_music_metadata merges keys without overwriting unrelated ones."""
784 _init_muse_repo(tmp_path)
785 _populate_workdir(tmp_path)
786
787 commit_id = await _commit_async(
788 message="base commit",
789 root=tmp_path,
790 session=muse_cli_db_session,
791 )
792
793 # Simulate tempo already set by muse tempo --set
794 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
795 assert row is not None
796 from sqlalchemy.orm.attributes import flag_modified
797
798 row.commit_metadata = {"tempo_bpm": 120.0}
799 flag_modified(row, "commit_metadata")
800 muse_cli_db_session.add(row)
801 await muse_cli_db_session.flush()
802
803 await _apply_commit_music_metadata(
804 session=muse_cli_db_session,
805 commit_id=commit_id,
806 section="bridge",
807 track=None,
808 emotion="melancholic",
809 )
810 await muse_cli_db_session.flush()
811
812 updated = await muse_cli_db_session.get(MuseCliCommit, commit_id)
813 assert updated is not None
814 meta = updated.commit_metadata
815 assert meta is not None
816 assert meta.get("tempo_bpm") == 120.0 # preserved
817 assert meta.get("section") == "bridge" # added
818 assert meta.get("emotion") == "melancholic" # added
819 assert "track" not in meta # not supplied → absent
820
821
822 @pytest.mark.anyio
823 async def test_apply_commit_music_metadata_noop_when_no_keys(
824 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
825 ) -> None:
826 """_apply_commit_music_metadata is a no-op when all args are None."""
827 _init_muse_repo(tmp_path)
828 _populate_workdir(tmp_path)
829
830 commit_id = await _commit_async(
831 message="plain commit",
832 root=tmp_path,
833 session=muse_cli_db_session,
834 )
835
836 await _apply_commit_music_metadata(
837 session=muse_cli_db_session,
838 commit_id=commit_id,
839 section=None,
840 track=None,
841 emotion=None,
842 )
843
844 row = await muse_cli_db_session.get(MuseCliCommit, commit_id)
845 assert row is not None
846 assert row.commit_metadata is None # untouched
847
848
849 # ---------------------------------------------------------------------------
850 # muse show reflects music metadata
851 # ---------------------------------------------------------------------------
852
853
854 @pytest.mark.anyio
855 async def test_show_reflects_music_metadata(
856 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
857 ) -> None:
858 """ShowCommitResult exposes section/track/emotion from commit_metadata."""
859 from maestro.muse_cli.commands.show import _show_async
860
861 _init_muse_repo(tmp_path)
862 _populate_workdir(tmp_path)
863
864 await _commit_async(
865 message="rich take",
866 root=tmp_path,
867 session=muse_cli_db_session,
868 section="chorus",
869 track="drums",
870 emotion="joyful",
871 )
872
873 result = await _show_async(
874 session=muse_cli_db_session,
875 muse_dir=tmp_path / ".muse",
876 ref="HEAD",
877 )
878
879 assert result["section"] == "chorus"
880 assert result["track"] == "drums"
881 assert result["emotion"] == "joyful"
882
883
884 @pytest.mark.anyio
885 async def test_show_music_metadata_absent_when_not_set(
886 tmp_path: pathlib.Path, muse_cli_db_session: AsyncSession
887 ) -> None:
888 """ShowCommitResult returns None for music fields when commit has no metadata."""
889 from maestro.muse_cli.commands.show import _show_async
890
891 _init_muse_repo(tmp_path)
892 _populate_workdir(tmp_path)
893
894 await _commit_async(
895 message="plain commit",
896 root=tmp_path,
897 session=muse_cli_db_session,
898 )
899
900 result = await _show_async(
901 session=muse_cli_db_session,
902 muse_dir=tmp_path / ".muse",
903 ref="HEAD",
904 )
905
906 assert result["section"] is None
907 assert result["track"] is None
908 assert result["emotion"] is None