cgcardona / muse public
test_muse_timeline.py python
847 lines 26.9 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for ``muse timeline`` — service layer, CLI flags, and output format.
2
3 Test strategy
4 -------------
5 - Service layer (:func:`build_timeline`) is tested with real SQLite commits
6 and tags to verify chronological ordering, tag extraction, and summaries.
7 - CLI command (``_timeline_async``) is tested with an in-memory SQLite session
8 and a minimal ``.muse/`` layout, exercising every flag combination.
9 - CLI integration tests use ``typer.testing.CliRunner`` against the full ``muse``
10 app for argument parsing and exit code verification.
11 - Renderers (``_render_text``, ``_render_json``) are tested directly via capsys.
12
13 Naming: ``test_<behavior>_<scenario>`` throughout.
14 """
15 from __future__ import annotations
16
17 import json
18 import os
19 import pathlib
20 import uuid
21 from collections.abc import AsyncGenerator
22 from datetime import datetime, timezone
23
24 import pytest
25 import pytest_asyncio
26 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
27 from sqlalchemy.pool import StaticPool
28 from typer.testing import CliRunner
29
30 from maestro.db.database import Base
31 import maestro.muse_cli.models # noqa: F401 — registers models with Base.metadata
32 from maestro.muse_cli.app import cli
33 from maestro.muse_cli.commands.timeline import (
34 _activity_bar,
35 _entry_to_dict,
36 _render_json,
37 _render_text,
38 _timeline_async,
39 )
40 from maestro.muse_cli.errors import ExitCode
41 from maestro.muse_cli.models import MuseCliCommit, MuseCliSnapshot, MuseCliTag
42 from maestro.services.muse_timeline import (
43 MuseTimelineEntry,
44 MuseTimelineResult,
45 _extract_prefix,
46 _group_tags_by_commit,
47 _make_entry,
48 build_timeline,
49 )
50
51 runner = CliRunner()
52
53
54 # ---------------------------------------------------------------------------
55 # Shared helpers
56 # ---------------------------------------------------------------------------
57
58
59 def _utc(year: int, month: int, day: int) -> datetime:
60 """Return a UTC datetime at midnight on the given date."""
61 return datetime(year, month, day, tzinfo=timezone.utc)
62
63
64 def _repo_id() -> str:
65 return str(uuid.uuid4())
66
67
68 def _snapshot_id() -> str:
69 return uuid.uuid4().hex * 2 # 64-char hex
70
71
72 def _commit_id() -> str:
73 return uuid.uuid4().hex * 2 # 64-char hex
74
75
76 # ---------------------------------------------------------------------------
77 # Fixtures
78 # ---------------------------------------------------------------------------
79
80
81 @pytest_asyncio.fixture
82 async def db_session() -> AsyncGenerator[AsyncSession, None]:
83 """In-memory SQLite session with all Muse tables created."""
84 engine = create_async_engine(
85 "sqlite+aiosqlite:///:memory:",
86 connect_args={"check_same_thread": False},
87 poolclass=StaticPool,
88 )
89 async with engine.begin() as conn:
90 await conn.run_sync(Base.metadata.create_all)
91 factory = async_sessionmaker(bind=engine, expire_on_commit=False)
92 async with factory() as session:
93 yield session
94 async with engine.begin() as conn:
95 await conn.run_sync(Base.metadata.drop_all)
96 await engine.dispose()
97
98
99 def _init_muse_repo(
100 root: pathlib.Path,
101 branch: str = "main",
102 repo_id: str | None = None,
103 ) -> str:
104 """Create a minimal ``.muse/`` directory tree with a repo.json."""
105 rid = repo_id or _repo_id()
106 muse = root / ".muse"
107 (muse / "refs" / "heads").mkdir(parents=True)
108 (muse / "repo.json").write_text(
109 json.dumps({"repo_id": rid, "schema_version": "1"})
110 )
111 (muse / "HEAD").write_text(f"refs/heads/{branch}")
112 (muse / "refs" / "heads" / branch).write_text("")
113 return rid
114
115
116 def _set_head(root: pathlib.Path, commit_id: str, branch: str = "main") -> None:
117 """Write *commit_id* into the branch ref file."""
118 muse = root / ".muse"
119 (muse / "refs" / "heads" / branch).write_text(commit_id)
120
121
122 async def _insert_snapshot(session: AsyncSession) -> str:
123 """Insert a blank snapshot and return its ID."""
124 sid = _snapshot_id()
125 snap = MuseCliSnapshot(snapshot_id=sid, manifest={})
126 session.add(snap)
127 await session.flush()
128 return sid
129
130
131 async def _insert_commit(
132 session: AsyncSession,
133 repo_id: str,
134 branch: str,
135 message: str,
136 committed_at: datetime,
137 parent_id: str | None = None,
138 ) -> MuseCliCommit:
139 """Insert a commit row and return the ORM object."""
140 sid = await _insert_snapshot(session)
141 cid = _commit_id()
142 commit = MuseCliCommit(
143 commit_id=cid,
144 repo_id=repo_id,
145 branch=branch,
146 message=message,
147 committed_at=committed_at,
148 parent_commit_id=parent_id,
149 snapshot_id=sid,
150 )
151 session.add(commit)
152 await session.flush()
153 return commit
154
155
156 async def _attach_tag(
157 session: AsyncSession,
158 repo_id: str,
159 commit_id: str,
160 tag: str,
161 ) -> MuseCliTag:
162 """Attach a tag to a commit."""
163 t = MuseCliTag(repo_id=repo_id, commit_id=commit_id, tag=tag)
164 session.add(t)
165 await session.flush()
166 return t
167
168
169 # ---------------------------------------------------------------------------
170 # Unit — service helpers
171 # ---------------------------------------------------------------------------
172
173
174 def test_extract_prefix_matches_known_prefix() -> None:
175 """_extract_prefix returns the value after a matching prefix."""
176 assert _extract_prefix("emotion:melancholic", "emotion:") == "melancholic"
177 assert _extract_prefix("section:chorus", "section:") == "chorus"
178 assert _extract_prefix("track:bass", "track:") == "bass"
179
180
181 def test_extract_prefix_returns_none_for_mismatch() -> None:
182 """_extract_prefix returns None when the prefix doesn't match."""
183 assert _extract_prefix("stage:rough-mix", "emotion:") is None
184 assert _extract_prefix("", "emotion:") is None
185
186
187 def test_group_tags_by_commit_groups_correctly() -> None:
188 """_group_tags_by_commit maps commit IDs to their tag lists."""
189 tags = [
190 MuseCliTag(repo_id="r1", commit_id="aaa", tag="emotion:joyful"),
191 MuseCliTag(repo_id="r1", commit_id="aaa", tag="section:chorus"),
192 MuseCliTag(repo_id="r1", commit_id="bbb", tag="emotion:melancholic"),
193 ]
194 grouped = _group_tags_by_commit(tags)
195 assert set(grouped["aaa"]) == {"emotion:joyful", "section:chorus"}
196 assert grouped["bbb"] == ["emotion:melancholic"]
197
198
199 def test_make_entry_parses_emotion_and_section_and_tracks() -> None:
200 """_make_entry extracts emotion, sections, and tracks from tags."""
201 snap_id = _snapshot_id()
202 commit = MuseCliCommit(
203 commit_id="a" * 64,
204 repo_id="r1",
205 branch="main",
206 message="Add chorus melody",
207 committed_at=_utc(2026, 2, 3),
208 snapshot_id=snap_id,
209 )
210 tags = ["emotion:joyful", "section:chorus", "track:keys", "track:vocals"]
211 entry = _make_entry(commit, tags)
212
213 assert entry.short_id == "aaaaaaa"
214 assert entry.emotion == "joyful"
215 assert entry.sections == ("chorus",)
216 assert set(entry.tracks) == {"keys", "vocals"}
217 assert entry.activity == 2 # two tracks
218
219
220 def test_make_entry_defaults_when_no_tags() -> None:
221 """_make_entry with no tags sets emotion=None, sections/tracks empty, activity=1."""
222 snap_id = _snapshot_id()
223 commit = MuseCliCommit(
224 commit_id="b" * 64,
225 repo_id="r1",
226 branch="main",
227 message="Initial take",
228 committed_at=_utc(2026, 2, 1),
229 snapshot_id=snap_id,
230 )
231 entry = _make_entry(commit, [])
232 assert entry.emotion is None
233 assert entry.sections == ()
234 assert entry.tracks == ()
235 assert entry.activity == 1
236
237
238 # ---------------------------------------------------------------------------
239 # Unit — activity_bar
240 # ---------------------------------------------------------------------------
241
242
243 def test_activity_bar_max_activity_produces_max_blocks() -> None:
244 """The most-active commit gets the maximum block count."""
245 bar = _activity_bar(10, 10)
246 assert len(bar) == 10
247 assert bar == "█" * 10
248
249
250 def test_activity_bar_scales_proportionally() -> None:
251 """Half-max activity should produce ~half the blocks."""
252 bar = _activity_bar(5, 10)
253 assert 4 <= len(bar) <= 6 # allow rounding
254
255
256 def test_activity_bar_zero_max_returns_minimum() -> None:
257 """Zero max_activity is handled gracefully with min blocks."""
258 bar = _activity_bar(0, 0)
259 assert len(bar) >= 1
260
261
262 # ---------------------------------------------------------------------------
263 # Unit — service: build_timeline
264 # ---------------------------------------------------------------------------
265
266
267 @pytest.mark.anyio
268 async def test_timeline_outputs_chronological_history(
269 tmp_path: pathlib.Path,
270 db_session: AsyncSession,
271 ) -> None:
272 """build_timeline returns entries in oldest-first order.
273
274 Regression test: ensures chronological ordering is preserved regardless
275 of DB insertion order.
276 """
277 rid = _repo_id()
278 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
279 c2 = await _insert_commit(db_session, rid, "main", "Add bass", _utc(2026, 2, 2), c1.commit_id)
280 c3 = await _insert_commit(db_session, rid, "main", "Chorus", _utc(2026, 2, 3), c2.commit_id)
281 await db_session.commit()
282
283 result = await build_timeline(
284 db_session, repo_id=rid, branch="main", head_commit_id=c3.commit_id
285 )
286
287 assert result.total_commits == 3
288 assert result.entries[0].commit_id == c1.commit_id # oldest first
289 assert result.entries[1].commit_id == c2.commit_id
290 assert result.entries[2].commit_id == c3.commit_id # newest last
291
292
293 @pytest.mark.anyio
294 async def test_timeline_empty_when_no_commits(
295 tmp_path: pathlib.Path,
296 db_session: AsyncSession,
297 ) -> None:
298 """build_timeline with a non-existent head_commit_id returns an empty result."""
299 rid = _repo_id()
300 result = await build_timeline(
301 db_session, repo_id=rid, branch="main", head_commit_id="nonexistent"
302 )
303 assert result.total_commits == 0
304 assert result.entries == ()
305
306
307 @pytest.mark.anyio
308 async def test_timeline_extracts_emotion_arc(
309 tmp_path: pathlib.Path,
310 db_session: AsyncSession,
311 ) -> None:
312 """build_timeline computes emotion_arc as ordered unique emotion labels."""
313 rid = _repo_id()
314 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
315 await _attach_tag(db_session, rid, c1.commit_id, "emotion:melancholic")
316 c2 = await _insert_commit(db_session, rid, "main", "Chorus", _utc(2026, 2, 2), c1.commit_id)
317 await _attach_tag(db_session, rid, c2.commit_id, "emotion:joyful")
318 c3 = await _insert_commit(db_session, rid, "main", "Bridge", _utc(2026, 2, 3), c2.commit_id)
319 await _attach_tag(db_session, rid, c3.commit_id, "emotion:tense")
320 await db_session.commit()
321
322 result = await build_timeline(
323 db_session, repo_id=rid, branch="main", head_commit_id=c3.commit_id
324 )
325 assert result.emotion_arc == ("melancholic", "joyful", "tense")
326
327
328 @pytest.mark.anyio
329 async def test_timeline_deduplicated_emotion_arc(
330 db_session: AsyncSession,
331 ) -> None:
332 """build_timeline deduplicates emotion_arc — repeated emotions appear once."""
333 rid = _repo_id()
334 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
335 await _attach_tag(db_session, rid, c1.commit_id, "emotion:melancholic")
336 c2 = await _insert_commit(db_session, rid, "main", "Add keys", _utc(2026, 2, 2), c1.commit_id)
337 await _attach_tag(db_session, rid, c2.commit_id, "emotion:melancholic")
338 await db_session.commit()
339
340 result = await build_timeline(
341 db_session, repo_id=rid, branch="main", head_commit_id=c2.commit_id
342 )
343 assert result.emotion_arc == ("melancholic",)
344
345
346 @pytest.mark.anyio
347 async def test_timeline_section_order(
348 db_session: AsyncSession,
349 ) -> None:
350 """build_timeline records section_order in order of first appearance."""
351 rid = _repo_id()
352 c1 = await _insert_commit(db_session, rid, "main", "Verse 1", _utc(2026, 2, 1))
353 await _attach_tag(db_session, rid, c1.commit_id, "section:verse")
354 c2 = await _insert_commit(db_session, rid, "main", "Chorus", _utc(2026, 2, 2), c1.commit_id)
355 await _attach_tag(db_session, rid, c2.commit_id, "section:chorus")
356 await db_session.commit()
357
358 result = await build_timeline(
359 db_session, repo_id=rid, branch="main", head_commit_id=c2.commit_id
360 )
361 assert result.section_order == ("verse", "chorus")
362
363
364 @pytest.mark.anyio
365 async def test_timeline_limit_caps_entries(
366 db_session: AsyncSession,
367 ) -> None:
368 """build_timeline respects the limit parameter."""
369 rid = _repo_id()
370 prev_id: str | None = None
371 last_commit: MuseCliCommit | None = None
372 for i in range(5):
373 c = await _insert_commit(
374 db_session, rid, "main", f"commit {i}", _utc(2026, 2, i + 1), prev_id
375 )
376 prev_id = c.commit_id
377 last_commit = c
378 await db_session.commit()
379
380 assert last_commit is not None
381 result = await build_timeline(
382 db_session, repo_id=rid, branch="main",
383 head_commit_id=last_commit.commit_id, limit=3
384 )
385 assert result.total_commits == 3
386
387
388 # ---------------------------------------------------------------------------
389 # Unit — renderers
390 # ---------------------------------------------------------------------------
391
392
393 def test_render_text_outputs_header(capsys: pytest.CaptureFixture[str]) -> None:
394 """_render_text prints branch name and commit count header."""
395 entry = MuseTimelineEntry(
396 commit_id="a" * 64,
397 short_id="aaaaaaa",
398 committed_at=_utc(2026, 2, 1),
399 message="Initial drums",
400 emotion=None,
401 sections=(),
402 tracks=(),
403 activity=1,
404 )
405 result = MuseTimelineResult(
406 entries=(entry,),
407 branch="main",
408 emotion_arc=(),
409 section_order=(),
410 total_commits=1,
411 )
412 _render_text(result, show_emotion=False, show_sections=False, show_tracks=False)
413 out = capsys.readouterr().out
414 assert "main" in out
415 assert "1 commit" in out
416 assert "aaaaaaa" in out
417 assert "2026-02-01" in out
418
419
420 def test_render_text_no_commits(capsys: pytest.CaptureFixture[str]) -> None:
421 """_render_text with empty entries prints the empty message."""
422 result = MuseTimelineResult(
423 entries=(), branch="main", emotion_arc=(), section_order=(), total_commits=0
424 )
425 _render_text(result, show_emotion=False, show_sections=False, show_tracks=False)
426 out = capsys.readouterr().out
427 assert "No commits" in out
428
429
430 def test_render_text_show_emotion_column(capsys: pytest.CaptureFixture[str]) -> None:
431 """_render_text with show_emotion=True includes emotion values in output."""
432 entry = MuseTimelineEntry(
433 commit_id="b" * 64,
434 short_id="bbbbbbb",
435 committed_at=_utc(2026, 2, 2),
436 message="Add chorus",
437 emotion="joyful",
438 sections=("chorus",),
439 tracks=("keys",),
440 activity=1,
441 )
442 result = MuseTimelineResult(
443 entries=(entry,),
444 branch="main",
445 emotion_arc=("joyful",),
446 section_order=("chorus",),
447 total_commits=1,
448 )
449 _render_text(result, show_emotion=True, show_sections=False, show_tracks=False)
450 out = capsys.readouterr().out
451 assert "joyful" in out
452 assert "Emotion arc" in out
453
454
455 def test_render_text_show_sections_header(capsys: pytest.CaptureFixture[str]) -> None:
456 """_render_text with show_sections=True prints section header lines."""
457 e1 = MuseTimelineEntry(
458 commit_id="c" * 64,
459 short_id="ccccccc",
460 committed_at=_utc(2026, 2, 1),
461 message="Verse start",
462 emotion=None,
463 sections=("verse",),
464 tracks=(),
465 activity=1,
466 )
467 e2 = MuseTimelineEntry(
468 commit_id="d" * 64,
469 short_id="ddddddd",
470 committed_at=_utc(2026, 2, 2),
471 message="Chorus start",
472 emotion=None,
473 sections=("chorus",),
474 tracks=(),
475 activity=1,
476 )
477 result = MuseTimelineResult(
478 entries=(e1, e2),
479 branch="main",
480 emotion_arc=(),
481 section_order=("verse", "chorus"),
482 total_commits=2,
483 )
484 _render_text(result, show_emotion=False, show_sections=True, show_tracks=False)
485 out = capsys.readouterr().out
486 assert "verse" in out
487 assert "chorus" in out
488 assert "──" in out
489
490
491 def test_render_text_show_tracks_column(capsys: pytest.CaptureFixture[str]) -> None:
492 """_render_text with show_tracks=True includes track names in output."""
493 entry = MuseTimelineEntry(
494 commit_id="e" * 64,
495 short_id="eeeeeee",
496 committed_at=_utc(2026, 2, 1),
497 message="Add bass",
498 emotion=None,
499 sections=(),
500 tracks=("bass", "drums"),
501 activity=2,
502 )
503 result = MuseTimelineResult(
504 entries=(entry,),
505 branch="main",
506 emotion_arc=(),
507 section_order=(),
508 total_commits=1,
509 )
510 _render_text(result, show_emotion=False, show_sections=False, show_tracks=True)
511 out = capsys.readouterr().out
512 assert "bass" in out
513 assert "drums" in out
514
515
516 def test_render_json_is_valid_and_complete(capsys: pytest.CaptureFixture[str]) -> None:
517 """_render_json emits valid JSON with expected top-level keys and entry fields."""
518 entry = MuseTimelineEntry(
519 commit_id="f" * 64,
520 short_id="fffffff",
521 committed_at=_utc(2026, 2, 3),
522 message="Chorus melody",
523 emotion="joyful",
524 sections=("chorus",),
525 tracks=("keys", "vocals"),
526 activity=2,
527 )
528 result = MuseTimelineResult(
529 entries=(entry,),
530 branch="main",
531 emotion_arc=("joyful",),
532 section_order=("chorus",),
533 total_commits=1,
534 )
535 _render_json(result)
536 raw = capsys.readouterr().out
537 payload = json.loads(raw)
538
539 assert payload["branch"] == "main"
540 assert payload["total_commits"] == 1
541 assert payload["emotion_arc"] == ["joyful"]
542 assert payload["section_order"] == ["chorus"]
543 assert len(payload["entries"]) == 1
544
545 e = payload["entries"][0]
546 assert e["short_id"] == "fffffff"
547 assert e["emotion"] == "joyful"
548 assert e["sections"] == ["chorus"]
549 assert set(e["tracks"]) == {"keys", "vocals"}
550 assert e["activity"] == 2
551
552
553 def test_entry_to_dict_serializes_all_fields() -> None:
554 """_entry_to_dict includes all required fields with correct types."""
555 entry = MuseTimelineEntry(
556 commit_id="a" * 64,
557 short_id="aaaaaaa",
558 committed_at=_utc(2026, 2, 1),
559 message="Init",
560 emotion=None,
561 sections=(),
562 tracks=(),
563 activity=1,
564 )
565 d = _entry_to_dict(entry)
566 assert "commit_id" in d
567 assert "short_id" in d
568 assert "committed_at" in d
569 assert "message" in d
570 assert "emotion" in d
571 assert "sections" in d
572 assert "tracks" in d
573 assert "activity" in d
574 assert d["emotion"] is None
575 assert isinstance(d["sections"], list)
576 assert isinstance(d["tracks"], list)
577
578
579 # ---------------------------------------------------------------------------
580 # Async core — _timeline_async
581 # ---------------------------------------------------------------------------
582
583
584 @pytest.mark.anyio
585 async def test_timeline_async_default_output(
586 tmp_path: pathlib.Path,
587 db_session: AsyncSession,
588 capsys: pytest.CaptureFixture[str],
589 ) -> None:
590 """_timeline_async default text mode shows all commits chronologically."""
591 rid = _init_muse_repo(tmp_path)
592 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
593 c2 = await _insert_commit(db_session, rid, "main", "Bass", _utc(2026, 2, 2), c1.commit_id)
594 await db_session.commit()
595 _set_head(tmp_path, c2.commit_id)
596
597 result = await _timeline_async(
598 root=tmp_path,
599 session=db_session,
600 commit_range=None,
601 show_emotion=False,
602 show_sections=False,
603 show_tracks=False,
604 as_json=False,
605 limit=1000,
606 )
607
608 assert result.total_commits == 2
609 assert result.entries[0].commit_id == c1.commit_id # oldest first
610 out = capsys.readouterr().out
611 assert "main" in out
612 assert "2026-02-01" in out
613
614
615 @pytest.mark.anyio
616 async def test_timeline_async_json_mode(
617 tmp_path: pathlib.Path,
618 db_session: AsyncSession,
619 capsys: pytest.CaptureFixture[str],
620 ) -> None:
621 """_timeline_async --json emits valid JSON with all entries."""
622 rid = _init_muse_repo(tmp_path)
623 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
624 c2 = await _insert_commit(db_session, rid, "main", "Chorus", _utc(2026, 2, 2), c1.commit_id)
625 await db_session.commit()
626 _set_head(tmp_path, c2.commit_id)
627
628 await _timeline_async(
629 root=tmp_path,
630 session=db_session,
631 commit_range=None,
632 show_emotion=False,
633 show_sections=False,
634 show_tracks=False,
635 as_json=True,
636 limit=1000,
637 )
638
639 raw = capsys.readouterr().out
640 payload = json.loads(raw)
641 assert payload["total_commits"] == 2
642 assert len(payload["entries"]) == 2
643
644
645 @pytest.mark.anyio
646 async def test_timeline_async_no_commits_exits_success(
647 tmp_path: pathlib.Path,
648 db_session: AsyncSession,
649 capsys: pytest.CaptureFixture[str],
650 ) -> None:
651 """_timeline_async exits 0 with an informative message when branch has no commits."""
652 _init_muse_repo(tmp_path)
653 # No _set_head — branch ref stays empty.
654
655 import typer
656
657 with pytest.raises(typer.Exit) as exc_info:
658 await _timeline_async(
659 root=tmp_path,
660 session=db_session,
661 commit_range=None,
662 show_emotion=False,
663 show_sections=False,
664 show_tracks=False,
665 as_json=False,
666 limit=1000,
667 )
668 assert exc_info.value.exit_code == int(ExitCode.SUCCESS)
669 out = capsys.readouterr().out
670 assert "empty" in out.lower() or "no commits" in out.lower()
671
672
673 @pytest.mark.anyio
674 async def test_timeline_async_commit_range_reserved_warns(
675 tmp_path: pathlib.Path,
676 db_session: AsyncSession,
677 capsys: pytest.CaptureFixture[str],
678 ) -> None:
679 """_timeline_async emits a stub warning when commit_range is supplied."""
680 rid = _init_muse_repo(tmp_path)
681 c1 = await _insert_commit(db_session, rid, "main", "Init", _utc(2026, 2, 1))
682 await db_session.commit()
683 _set_head(tmp_path, c1.commit_id)
684
685 await _timeline_async(
686 root=tmp_path,
687 session=db_session,
688 commit_range="HEAD~5..HEAD",
689 show_emotion=False,
690 show_sections=False,
691 show_tracks=False,
692 as_json=False,
693 limit=1000,
694 )
695
696 out = capsys.readouterr().out
697 assert "reserved" in out.lower() or "HEAD~5..HEAD" in out
698
699
700 @pytest.mark.anyio
701 async def test_timeline_async_emotion_flag_shows_tags(
702 tmp_path: pathlib.Path,
703 db_session: AsyncSession,
704 capsys: pytest.CaptureFixture[str],
705 ) -> None:
706 """_timeline_async --emotion includes emotion values in text output."""
707 rid = _init_muse_repo(tmp_path)
708 c1 = await _insert_commit(db_session, rid, "main", "Intro", _utc(2026, 2, 1))
709 await _attach_tag(db_session, rid, c1.commit_id, "emotion:melancholic")
710 await db_session.commit()
711 _set_head(tmp_path, c1.commit_id)
712
713 await _timeline_async(
714 root=tmp_path,
715 session=db_session,
716 commit_range=None,
717 show_emotion=True,
718 show_sections=False,
719 show_tracks=False,
720 as_json=False,
721 limit=1000,
722 )
723
724 out = capsys.readouterr().out
725 assert "melancholic" in out
726
727
728 @pytest.mark.anyio
729 async def test_timeline_async_sections_flag_groups_commits(
730 tmp_path: pathlib.Path,
731 db_session: AsyncSession,
732 capsys: pytest.CaptureFixture[str],
733 ) -> None:
734 """_timeline_async --sections prints section header lines."""
735 rid = _init_muse_repo(tmp_path)
736 c1 = await _insert_commit(db_session, rid, "main", "Verse", _utc(2026, 2, 1))
737 await _attach_tag(db_session, rid, c1.commit_id, "section:verse")
738 c2 = await _insert_commit(db_session, rid, "main", "Chorus", _utc(2026, 2, 2), c1.commit_id)
739 await _attach_tag(db_session, rid, c2.commit_id, "section:chorus")
740 await db_session.commit()
741 _set_head(tmp_path, c2.commit_id)
742
743 await _timeline_async(
744 root=tmp_path,
745 session=db_session,
746 commit_range=None,
747 show_emotion=False,
748 show_sections=True,
749 show_tracks=False,
750 as_json=False,
751 limit=1000,
752 )
753
754 out = capsys.readouterr().out
755 assert "verse" in out
756 assert "chorus" in out
757 assert "──" in out
758
759
760 @pytest.mark.anyio
761 async def test_timeline_async_tracks_flag_shows_track_column(
762 tmp_path: pathlib.Path,
763 db_session: AsyncSession,
764 capsys: pytest.CaptureFixture[str],
765 ) -> None:
766 """_timeline_async --tracks includes track names in text output."""
767 rid = _init_muse_repo(tmp_path)
768 c1 = await _insert_commit(db_session, rid, "main", "Add bass", _utc(2026, 2, 1))
769 await _attach_tag(db_session, rid, c1.commit_id, "track:bass")
770 await _attach_tag(db_session, rid, c1.commit_id, "track:drums")
771 await db_session.commit()
772 _set_head(tmp_path, c1.commit_id)
773
774 await _timeline_async(
775 root=tmp_path,
776 session=db_session,
777 commit_range=None,
778 show_emotion=False,
779 show_sections=False,
780 show_tracks=True,
781 as_json=False,
782 limit=1000,
783 )
784
785 out = capsys.readouterr().out
786 assert "bass" in out
787 assert "drums" in out
788
789
790 @pytest.mark.anyio
791 async def test_timeline_async_graceful_with_no_metadata_tags(
792 tmp_path: pathlib.Path,
793 db_session: AsyncSession,
794 capsys: pytest.CaptureFixture[str],
795 ) -> None:
796 """_timeline_async renders commits with no tags without crashing (shows '')."""
797 rid = _init_muse_repo(tmp_path)
798 c1 = await _insert_commit(db_session, rid, "main", "Raw commit", _utc(2026, 2, 1))
799 await db_session.commit()
800 _set_head(tmp_path, c1.commit_id)
801
802 result = await _timeline_async(
803 root=tmp_path,
804 session=db_session,
805 commit_range=None,
806 show_emotion=True,
807 show_sections=True,
808 show_tracks=True,
809 as_json=False,
810 limit=1000,
811 )
812 # Should not crash and should show the commit.
813 assert result.total_commits == 1
814 out = capsys.readouterr().out
815 assert "Raw commit" in out
816
817
818 # ---------------------------------------------------------------------------
819 # CLI integration — CliRunner
820 # ---------------------------------------------------------------------------
821
822
823 def test_cli_timeline_outside_repo_exits_2(tmp_path: pathlib.Path) -> None:
824 """``muse timeline`` exits 2 when invoked outside a Muse repository."""
825 prev = os.getcwd()
826 try:
827 os.chdir(tmp_path)
828 result = runner.invoke(cli, ["timeline"], catch_exceptions=False)
829 finally:
830 os.chdir(prev)
831 assert result.exit_code == int(ExitCode.REPO_NOT_FOUND)
832 assert "not a muse repository" in result.output.lower()
833
834
835 def test_cli_timeline_help_lists_all_flags() -> None:
836 """``muse timeline --help`` shows all documented flags."""
837 result = runner.invoke(cli, ["timeline", "--help"])
838 assert result.exit_code == 0
839 for flag in ("--emotion", "--sections", "--tracks", "--json", "--limit"):
840 assert flag in result.output, f"Flag '{flag}' not in help"
841
842
843 def test_cli_timeline_appears_in_muse_help() -> None:
844 """``muse --help`` lists the timeline subcommand."""
845 result = runner.invoke(cli, ["--help"])
846 assert result.exit_code == 0
847 assert "timeline" in result.output