test_log.py
python
| 1 | """Tests for ``muse log``. |
| 2 | |
| 3 | All async tests call ``_log_async`` directly with an in-memory SQLite |
| 4 | session and a ``tmp_path`` repo root — no real Postgres or running process |
| 5 | required. Commits are seeded via ``_commit_async`` so the two commands |
| 6 | are tested as an integrated pair. |
| 7 | """ |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import json |
| 11 | import pathlib |
| 12 | import uuid |
| 13 | |
| 14 | import pytest |
| 15 | from sqlalchemy.ext.asyncio import AsyncSession |
| 16 | |
| 17 | from maestro.muse_cli.commands.commit import _commit_async |
| 18 | from maestro.muse_cli.commands.log import _log_async |
| 19 | from maestro.muse_cli.errors import ExitCode |
| 20 | |
| 21 | |
| 22 | # --------------------------------------------------------------------------- |
| 23 | # Helpers (shared with test_commit.py pattern) |
| 24 | # --------------------------------------------------------------------------- |
| 25 | |
| 26 | |
| 27 | def _init_muse_repo(root: pathlib.Path, repo_id: str | None = None) -> str: |
| 28 | rid = repo_id or str(uuid.uuid4()) |
| 29 | muse = root / ".muse" |
| 30 | (muse / "refs" / "heads").mkdir(parents=True) |
| 31 | (muse / "repo.json").write_text( |
| 32 | json.dumps({"repo_id": rid, "schema_version": "1"}) |
| 33 | ) |
| 34 | (muse / "HEAD").write_text("refs/heads/main") |
| 35 | (muse / "refs" / "heads" / "main").write_text("") |
| 36 | return rid |
| 37 | |
| 38 | |
| 39 | def _write_workdir(root: pathlib.Path, files: dict[str, bytes]) -> None: |
| 40 | workdir = root / "muse-work" |
| 41 | workdir.mkdir(exist_ok=True) |
| 42 | for name, content in files.items(): |
| 43 | (workdir / name).write_bytes(content) |
| 44 | |
| 45 | |
| 46 | async def _make_commits( |
| 47 | root: pathlib.Path, |
| 48 | session: AsyncSession, |
| 49 | messages: list[str], |
| 50 | file_seed: int = 0, |
| 51 | ) -> list[str]: |
| 52 | """Create N commits on the repo, each with unique file content.""" |
| 53 | commit_ids: list[str] = [] |
| 54 | for i, msg in enumerate(messages): |
| 55 | _write_workdir(root, {f"track_{file_seed + i}.mid": f"MIDI-{file_seed + i}".encode()}) |
| 56 | cid = await _commit_async(message=msg, root=root, session=session) |
| 57 | commit_ids.append(cid) |
| 58 | return commit_ids |
| 59 | |
| 60 | |
| 61 | # --------------------------------------------------------------------------- |
| 62 | # test_log_shows_commits_newest_first |
| 63 | # --------------------------------------------------------------------------- |
| 64 | |
| 65 | |
| 66 | @pytest.mark.anyio |
| 67 | async def test_log_shows_commits_newest_first( |
| 68 | tmp_path: pathlib.Path, |
| 69 | muse_cli_db_session: AsyncSession, |
| 70 | capsys: pytest.CaptureFixture[str], |
| 71 | ) -> None: |
| 72 | """Three sequential commits appear in the log newest-first.""" |
| 73 | _init_muse_repo(tmp_path) |
| 74 | cids = await _make_commits(tmp_path, muse_cli_db_session, ["take 1", "take 2", "take 3"]) |
| 75 | |
| 76 | capsys.readouterr() # discard ✅ output from _commit_async calls |
| 77 | await _log_async( |
| 78 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=False |
| 79 | ) |
| 80 | out = capsys.readouterr().out |
| 81 | |
| 82 | # All three commit IDs should appear |
| 83 | for cid in cids: |
| 84 | assert cid in out |
| 85 | |
| 86 | # Newest (take 3) should appear before oldest (take 1) |
| 87 | assert out.index(cids[2]) < out.index(cids[0]) |
| 88 | # take 3 message first, take 1 last |
| 89 | assert out.index("take 3") < out.index("take 1") |
| 90 | |
| 91 | |
| 92 | # --------------------------------------------------------------------------- |
| 93 | # test_log_shows_correct_parent_chain |
| 94 | # --------------------------------------------------------------------------- |
| 95 | |
| 96 | |
| 97 | @pytest.mark.anyio |
| 98 | async def test_log_shows_correct_parent_chain( |
| 99 | tmp_path: pathlib.Path, |
| 100 | muse_cli_db_session: AsyncSession, |
| 101 | capsys: pytest.CaptureFixture[str], |
| 102 | ) -> None: |
| 103 | """Each commit's ``Parent:`` line shows the short ID of its predecessor.""" |
| 104 | _init_muse_repo(tmp_path) |
| 105 | cids = await _make_commits(tmp_path, muse_cli_db_session, ["first", "second", "third"]) |
| 106 | |
| 107 | await _log_async( |
| 108 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=False |
| 109 | ) |
| 110 | out = capsys.readouterr().out |
| 111 | |
| 112 | # "third" commit (cids[2]) should show cids[1][:8] as its parent |
| 113 | assert f"Parent: {cids[1][:8]}" in out |
| 114 | # "second" commit shows cids[0][:8] as parent |
| 115 | assert f"Parent: {cids[0][:8]}" in out |
| 116 | # "first" commit has no parent — no Parent line for it |
| 117 | lines = out.splitlines() |
| 118 | # Find the block for the first commit (last in output = oldest) |
| 119 | first_commit_idx = out.index(cids[0]) |
| 120 | first_commit_block = out[first_commit_idx:] |
| 121 | # The first block should not have a Parent: line before the next "commit " line |
| 122 | next_commit = first_commit_block.find("commit ", 8) # skip the "commit <id>" itself |
| 123 | if next_commit == -1: |
| 124 | block = first_commit_block |
| 125 | else: |
| 126 | block = first_commit_block[:next_commit] |
| 127 | assert "Parent:" not in block |
| 128 | |
| 129 | |
| 130 | # --------------------------------------------------------------------------- |
| 131 | # test_log_limit_restricts_output |
| 132 | # --------------------------------------------------------------------------- |
| 133 | |
| 134 | |
| 135 | @pytest.mark.anyio |
| 136 | async def test_log_limit_restricts_output( |
| 137 | tmp_path: pathlib.Path, |
| 138 | muse_cli_db_session: AsyncSession, |
| 139 | capsys: pytest.CaptureFixture[str], |
| 140 | ) -> None: |
| 141 | """--limit 2 shows exactly the two most recent commits.""" |
| 142 | _init_muse_repo(tmp_path) |
| 143 | cids = await _make_commits( |
| 144 | tmp_path, muse_cli_db_session, ["take 1", "take 2", "take 3"] |
| 145 | ) |
| 146 | |
| 147 | await _log_async( |
| 148 | root=tmp_path, session=muse_cli_db_session, limit=2, graph=False |
| 149 | ) |
| 150 | out = capsys.readouterr().out |
| 151 | |
| 152 | assert out.count("commit ") == 2 |
| 153 | # Most recent two appear |
| 154 | assert cids[2] in out # take 3 |
| 155 | assert cids[1] in out # take 2 |
| 156 | # Oldest excluded |
| 157 | assert cids[0] not in out |
| 158 | |
| 159 | |
| 160 | # --------------------------------------------------------------------------- |
| 161 | # test_log_shows_single_parent_line |
| 162 | # --------------------------------------------------------------------------- |
| 163 | |
| 164 | |
| 165 | @pytest.mark.anyio |
| 166 | async def test_log_shows_single_parent_line( |
| 167 | tmp_path: pathlib.Path, |
| 168 | muse_cli_db_session: AsyncSession, |
| 169 | capsys: pytest.CaptureFixture[str], |
| 170 | ) -> None: |
| 171 | """A commit with one parent shows exactly one ``Parent:`` line. |
| 172 | |
| 173 | Note: merge commits with two parents (``parent2_commit_id``) are |
| 174 | deferred to (``muse merge``). This test documents the |
| 175 | current single-parent behavior. |
| 176 | """ |
| 177 | _init_muse_repo(tmp_path) |
| 178 | cids = await _make_commits(tmp_path, muse_cli_db_session, ["v1", "v2"]) |
| 179 | |
| 180 | await _log_async( |
| 181 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=False |
| 182 | ) |
| 183 | out = capsys.readouterr().out |
| 184 | |
| 185 | # Only one Parent: line in the entire output (the second commit references the first) |
| 186 | assert out.count("Parent:") == 1 |
| 187 | assert f"Parent: {cids[0][:8]}" in out |
| 188 | |
| 189 | |
| 190 | # --------------------------------------------------------------------------- |
| 191 | # test_log_no_commits_exits_zero |
| 192 | # --------------------------------------------------------------------------- |
| 193 | |
| 194 | |
| 195 | @pytest.mark.anyio |
| 196 | async def test_log_no_commits_exits_zero( |
| 197 | tmp_path: pathlib.Path, |
| 198 | muse_cli_db_session: AsyncSession, |
| 199 | capsys: pytest.CaptureFixture[str], |
| 200 | ) -> None: |
| 201 | """``muse log`` on a repo with no commits exits 0 with a friendly message.""" |
| 202 | import typer |
| 203 | |
| 204 | _init_muse_repo(tmp_path) |
| 205 | # Deliberately do NOT commit anything |
| 206 | |
| 207 | with pytest.raises(typer.Exit) as exc_info: |
| 208 | await _log_async( |
| 209 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=False |
| 210 | ) |
| 211 | |
| 212 | assert exc_info.value.exit_code == ExitCode.SUCCESS |
| 213 | out = capsys.readouterr().out |
| 214 | assert "No commits yet" in out |
| 215 | assert "main" in out |
| 216 | |
| 217 | |
| 218 | # --------------------------------------------------------------------------- |
| 219 | # test_log_outside_repo_exits_2 |
| 220 | # --------------------------------------------------------------------------- |
| 221 | |
| 222 | |
| 223 | def test_log_outside_repo_exits_2(tmp_path: pathlib.Path) -> None: |
| 224 | """``muse log`` outside a .muse/ directory exits with code 2.""" |
| 225 | import os |
| 226 | from typer.testing import CliRunner |
| 227 | from maestro.muse_cli.app import cli |
| 228 | |
| 229 | runner = CliRunner() |
| 230 | prev = os.getcwd() |
| 231 | try: |
| 232 | os.chdir(tmp_path) |
| 233 | result = runner.invoke(cli, ["log"], catch_exceptions=False) |
| 234 | finally: |
| 235 | os.chdir(prev) |
| 236 | |
| 237 | assert result.exit_code == ExitCode.REPO_NOT_FOUND |
| 238 | |
| 239 | |
| 240 | # --------------------------------------------------------------------------- |
| 241 | # test_log_graph_produces_ascii_output |
| 242 | # --------------------------------------------------------------------------- |
| 243 | |
| 244 | |
| 245 | @pytest.mark.anyio |
| 246 | async def test_log_graph_produces_ascii_output( |
| 247 | tmp_path: pathlib.Path, |
| 248 | muse_cli_db_session: AsyncSession, |
| 249 | capsys: pytest.CaptureFixture[str], |
| 250 | ) -> None: |
| 251 | """``--graph`` output contains ASCII graph characters (* and |).""" |
| 252 | _init_muse_repo(tmp_path) |
| 253 | await _make_commits(tmp_path, muse_cli_db_session, ["beat 1", "beat 2", "beat 3"]) |
| 254 | |
| 255 | await _log_async( |
| 256 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=True |
| 257 | ) |
| 258 | out = capsys.readouterr().out |
| 259 | |
| 260 | assert "*" in out |
| 261 | # Each commit message should appear in the graph output |
| 262 | assert "beat 1" in out |
| 263 | assert "beat 2" in out |
| 264 | assert "beat 3" in out |
| 265 | |
| 266 | |
| 267 | # --------------------------------------------------------------------------- |
| 268 | # test_log_head_marker_on_newest_commit |
| 269 | # --------------------------------------------------------------------------- |
| 270 | |
| 271 | |
| 272 | @pytest.mark.anyio |
| 273 | async def test_log_head_marker_on_newest_commit( |
| 274 | tmp_path: pathlib.Path, |
| 275 | muse_cli_db_session: AsyncSession, |
| 276 | capsys: pytest.CaptureFixture[str], |
| 277 | ) -> None: |
| 278 | """The most recent commit is labelled ``(HEAD -> main)``.""" |
| 279 | _init_muse_repo(tmp_path) |
| 280 | cids = await _make_commits(tmp_path, muse_cli_db_session, ["first", "second"]) |
| 281 | |
| 282 | await _log_async( |
| 283 | root=tmp_path, session=muse_cli_db_session, limit=1000, graph=False |
| 284 | ) |
| 285 | out = capsys.readouterr().out |
| 286 | |
| 287 | # Only the newest commit (cids[1]) carries the HEAD marker |
| 288 | assert f"{cids[1]} (HEAD -> main)" in out |
| 289 | # Older commit does NOT carry it |
| 290 | assert f"{cids[0]} (HEAD -> main)" not in out |
| 291 | |
| 292 | |
| 293 | # --------------------------------------------------------------------------- |
| 294 | # test_log_limit_one_shows_only_head |
| 295 | # --------------------------------------------------------------------------- |
| 296 | |
| 297 | |
| 298 | @pytest.mark.anyio |
| 299 | async def test_log_limit_one_shows_only_head( |
| 300 | tmp_path: pathlib.Path, |
| 301 | muse_cli_db_session: AsyncSession, |
| 302 | capsys: pytest.CaptureFixture[str], |
| 303 | ) -> None: |
| 304 | """``--limit 1`` shows only the HEAD commit, regardless of chain length.""" |
| 305 | _init_muse_repo(tmp_path) |
| 306 | cids = await _make_commits(tmp_path, muse_cli_db_session, ["a", "b", "c"]) |
| 307 | |
| 308 | await _log_async( |
| 309 | root=tmp_path, session=muse_cli_db_session, limit=1, graph=False |
| 310 | ) |
| 311 | out = capsys.readouterr().out |
| 312 | |
| 313 | assert out.count("commit ") == 1 |
| 314 | assert cids[2] in out |
| 315 | assert cids[1] not in out |
| 316 | assert cids[0] not in out |