test_muse_contour.py
python
| 1 | """Tests for ``muse contour`` — CLI interface, flag parsing, and stub output format. |
| 2 | |
| 3 | All CLI-level tests use ``typer.testing.CliRunner`` against the full ``muse`` |
| 4 | app so that argument parsing, flag handling, and exit codes are exercised end-to-end. |
| 5 | |
| 6 | Async core tests call the internal async functions directly with an in-memory |
| 7 | SQLite session (the stub does not query the DB, so the session is injected only |
| 8 | to satisfy the signature contract). |
| 9 | """ |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | import json |
| 13 | import os |
| 14 | import pathlib |
| 15 | import uuid |
| 16 | from collections.abc import AsyncGenerator |
| 17 | |
| 18 | import pytest |
| 19 | import pytest_asyncio |
| 20 | from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine |
| 21 | from sqlalchemy.pool import StaticPool |
| 22 | from typer.testing import CliRunner |
| 23 | |
| 24 | from maestro.db.database import Base |
| 25 | import maestro.muse_cli.models # noqa: F401 — registers MuseCli* with Base.metadata |
| 26 | from maestro.muse_cli.app import cli |
| 27 | from maestro.muse_cli.commands.contour import ( |
| 28 | SHAPE_LABELS, |
| 29 | VALID_SHAPES, |
| 30 | ContourCompareResult, |
| 31 | ContourResult, |
| 32 | _contour_compare_async, |
| 33 | _contour_detect_async, |
| 34 | _contour_history_async, |
| 35 | _format_compare, |
| 36 | _format_detect, |
| 37 | _format_history, |
| 38 | ) |
| 39 | from maestro.muse_cli.errors import ExitCode |
| 40 | |
| 41 | runner = CliRunner() |
| 42 | |
| 43 | # --------------------------------------------------------------------------- |
| 44 | # Fixtures |
| 45 | # --------------------------------------------------------------------------- |
| 46 | |
| 47 | |
| 48 | def _init_muse_repo(root: pathlib.Path, branch: str = "main") -> str: |
| 49 | """Create a minimal .muse/ layout with one empty commit ref.""" |
| 50 | rid = str(uuid.uuid4()) |
| 51 | muse = root / ".muse" |
| 52 | (muse / "refs" / "heads").mkdir(parents=True) |
| 53 | (muse / "repo.json").write_text(json.dumps({"repo_id": rid, "schema_version": "1"})) |
| 54 | (muse / "HEAD").write_text(f"refs/heads/{branch}") |
| 55 | (muse / "refs" / "heads" / branch).write_text("") |
| 56 | return rid |
| 57 | |
| 58 | |
| 59 | def _commit_ref(root: pathlib.Path, branch: str = "main") -> None: |
| 60 | """Write a fake commit ID into the branch ref so HEAD is non-empty.""" |
| 61 | muse = root / ".muse" |
| 62 | (muse / "refs" / "heads" / branch).write_text("a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2") |
| 63 | |
| 64 | |
| 65 | @pytest_asyncio.fixture |
| 66 | async def db_session() -> AsyncGenerator[AsyncSession, None]: |
| 67 | """In-memory SQLite session (stub contour does not actually query it).""" |
| 68 | engine = create_async_engine( |
| 69 | "sqlite+aiosqlite:///:memory:", |
| 70 | connect_args={"check_same_thread": False}, |
| 71 | poolclass=StaticPool, |
| 72 | ) |
| 73 | async with engine.begin() as conn: |
| 74 | await conn.run_sync(Base.metadata.create_all) |
| 75 | factory = async_sessionmaker(bind=engine, expire_on_commit=False) |
| 76 | async with factory() as session: |
| 77 | yield session |
| 78 | async with engine.begin() as conn: |
| 79 | await conn.run_sync(Base.metadata.drop_all) |
| 80 | await engine.dispose() |
| 81 | |
| 82 | |
| 83 | # --------------------------------------------------------------------------- |
| 84 | # Unit — constants |
| 85 | # --------------------------------------------------------------------------- |
| 86 | |
| 87 | |
| 88 | def test_shape_labels_constant_matches_valid_shapes() -> None: |
| 89 | """SHAPE_LABELS tuple and VALID_SHAPES frozenset are in sync.""" |
| 90 | assert set(SHAPE_LABELS) == VALID_SHAPES |
| 91 | |
| 92 | |
| 93 | def test_valid_shapes_contains_expected_labels() -> None: |
| 94 | """All six canonical shape labels are present in VALID_SHAPES.""" |
| 95 | expected = {"ascending", "descending", "arch", "inverted-arch", "wave", "static"} |
| 96 | assert expected == VALID_SHAPES |
| 97 | |
| 98 | |
| 99 | # --------------------------------------------------------------------------- |
| 100 | # Unit — formatters |
| 101 | # --------------------------------------------------------------------------- |
| 102 | |
| 103 | |
| 104 | def _make_result( |
| 105 | *, |
| 106 | shape: str = "arch", |
| 107 | tessitura: int = 24, |
| 108 | avg_interval: float = 2.5, |
| 109 | phrase_count: int = 4, |
| 110 | avg_phrase_bars: float = 8.0, |
| 111 | commit: str = "a1b2c3d4", |
| 112 | branch: str = "main", |
| 113 | track: str = "all", |
| 114 | section: str = "all", |
| 115 | source: str = "stub", |
| 116 | ) -> ContourResult: |
| 117 | return ContourResult( |
| 118 | shape=shape, |
| 119 | tessitura=tessitura, |
| 120 | avg_interval=avg_interval, |
| 121 | phrase_count=phrase_count, |
| 122 | avg_phrase_bars=avg_phrase_bars, |
| 123 | commit=commit, |
| 124 | branch=branch, |
| 125 | track=track, |
| 126 | section=section, |
| 127 | source=source, |
| 128 | ) |
| 129 | |
| 130 | |
| 131 | def test_format_detect_human_readable_contains_shape() -> None: |
| 132 | """_format_detect (human mode) includes shape, range, phrase info.""" |
| 133 | result = _make_result() |
| 134 | out = _format_detect(result, as_json=False, shape_only=False) |
| 135 | assert "Shape: arch" in out |
| 136 | assert "Phrases: 4" in out |
| 137 | assert "Angularity:" in out |
| 138 | assert "stub" in out |
| 139 | |
| 140 | |
| 141 | def test_format_detect_shape_only() -> None: |
| 142 | """_format_detect with shape_only=True returns just the shape line.""" |
| 143 | result = _make_result(shape="wave") |
| 144 | out = _format_detect(result, as_json=False, shape_only=True) |
| 145 | assert out == "Shape: wave" |
| 146 | |
| 147 | |
| 148 | def test_format_detect_json_is_valid() -> None: |
| 149 | """_format_detect with as_json=True returns valid parseable JSON.""" |
| 150 | result = _make_result() |
| 151 | raw = _format_detect(result, as_json=True, shape_only=False) |
| 152 | payload = json.loads(raw) |
| 153 | assert payload["shape"] == "arch" |
| 154 | assert payload["tessitura"] == 24 |
| 155 | assert payload["phrase_count"] == 4 |
| 156 | assert payload["source"] == "stub" |
| 157 | |
| 158 | |
| 159 | def test_format_detect_range_octaves() -> None: |
| 160 | """_format_detect converts tessitura semitones to octave string correctly.""" |
| 161 | result = _make_result(tessitura=24) |
| 162 | out = _format_detect(result, as_json=False, shape_only=False) |
| 163 | assert "2 octaves" in out |
| 164 | |
| 165 | result_one_octave = _make_result(tessitura=12) |
| 166 | out2 = _format_detect(result_one_octave, as_json=False, shape_only=False) |
| 167 | assert "1 octave" in out2 |
| 168 | |
| 169 | |
| 170 | def test_format_history_human_readable() -> None: |
| 171 | """_format_history renders commit, shape, range, and angularity per entry.""" |
| 172 | entries = [_make_result(commit="deadbeef")] |
| 173 | out = _format_history(entries, as_json=False) |
| 174 | assert "deadbeef" in out |
| 175 | assert "arch" in out |
| 176 | assert "24 st" in out |
| 177 | |
| 178 | |
| 179 | def test_format_history_empty() -> None: |
| 180 | """_format_history returns a helpful message when entries list is empty.""" |
| 181 | out = _format_history([], as_json=False) |
| 182 | assert "no contour history" in out.lower() |
| 183 | |
| 184 | |
| 185 | def test_format_history_json() -> None: |
| 186 | """_format_history with as_json=True emits a JSON array.""" |
| 187 | entries = [_make_result()] |
| 188 | raw = _format_history(entries, as_json=True) |
| 189 | payload = json.loads(raw) |
| 190 | assert isinstance(payload, list) |
| 191 | assert len(payload) == 1 |
| 192 | assert payload[0]["shape"] == "arch" |
| 193 | |
| 194 | |
| 195 | def test_format_compare_human_readable() -> None: |
| 196 | """_format_compare renders commit refs, shapes, and delta line.""" |
| 197 | result = ContourCompareResult( |
| 198 | commit_a=_make_result(commit="aaaa1111"), |
| 199 | commit_b=_make_result(commit="bbbb2222", shape="ascending"), |
| 200 | shape_changed=True, |
| 201 | angularity_delta=0.5, |
| 202 | tessitura_delta=4, |
| 203 | ) |
| 204 | out = _format_compare(result, as_json=False) |
| 205 | assert "aaaa1111" in out |
| 206 | assert "bbbb2222" in out |
| 207 | assert "shape changed" in out |
| 208 | assert "Delta" in out |
| 209 | |
| 210 | |
| 211 | def test_format_compare_json() -> None: |
| 212 | """_format_compare with as_json=True emits parseable JSON.""" |
| 213 | result = ContourCompareResult( |
| 214 | commit_a=_make_result(commit="aaa"), |
| 215 | commit_b=_make_result(commit="bbb"), |
| 216 | shape_changed=False, |
| 217 | angularity_delta=0.0, |
| 218 | tessitura_delta=0, |
| 219 | ) |
| 220 | raw = _format_compare(result, as_json=True) |
| 221 | payload = json.loads(raw) |
| 222 | assert "commit_a" in payload |
| 223 | assert "commit_b" in payload |
| 224 | assert payload["shape_changed"] is False |
| 225 | assert payload["angularity_delta"] == 0.0 |
| 226 | |
| 227 | |
| 228 | # --------------------------------------------------------------------------- |
| 229 | # Async core — _contour_detect_async |
| 230 | # --------------------------------------------------------------------------- |
| 231 | |
| 232 | |
| 233 | @pytest.mark.anyio |
| 234 | async def test_contour_detect_async_returns_contour_result( |
| 235 | tmp_path: pathlib.Path, |
| 236 | db_session: AsyncSession, |
| 237 | ) -> None: |
| 238 | """_contour_detect_async returns a ContourResult with all expected keys.""" |
| 239 | _init_muse_repo(tmp_path) |
| 240 | _commit_ref(tmp_path) |
| 241 | |
| 242 | result = await _contour_detect_async( |
| 243 | root=tmp_path, session=db_session, commit=None, track=None, section=None |
| 244 | ) |
| 245 | |
| 246 | assert result["shape"] in VALID_SHAPES |
| 247 | assert isinstance(result["tessitura"], int) |
| 248 | assert result["tessitura"] > 0 |
| 249 | assert isinstance(result["avg_interval"], float) |
| 250 | assert result["phrase_count"] > 0 |
| 251 | assert result["branch"] == "main" |
| 252 | assert result["track"] == "all" |
| 253 | assert result["section"] == "all" |
| 254 | |
| 255 | |
| 256 | @pytest.mark.anyio |
| 257 | async def test_contour_detect_async_uses_explicit_commit( |
| 258 | tmp_path: pathlib.Path, |
| 259 | db_session: AsyncSession, |
| 260 | ) -> None: |
| 261 | """When commit is provided, it appears as the commit field in the result.""" |
| 262 | _init_muse_repo(tmp_path) |
| 263 | _commit_ref(tmp_path) |
| 264 | |
| 265 | result = await _contour_detect_async( |
| 266 | root=tmp_path, session=db_session, commit="deadbeef", track=None, section=None |
| 267 | ) |
| 268 | assert result["commit"] == "deadbeef" |
| 269 | |
| 270 | |
| 271 | @pytest.mark.anyio |
| 272 | async def test_contour_detect_async_track_filter( |
| 273 | tmp_path: pathlib.Path, |
| 274 | db_session: AsyncSession, |
| 275 | ) -> None: |
| 276 | """Track name is propagated into the result when specified.""" |
| 277 | _init_muse_repo(tmp_path) |
| 278 | _commit_ref(tmp_path) |
| 279 | |
| 280 | result = await _contour_detect_async( |
| 281 | root=tmp_path, session=db_session, commit=None, track="keys", section=None |
| 282 | ) |
| 283 | assert result["track"] == "keys" |
| 284 | |
| 285 | |
| 286 | @pytest.mark.anyio |
| 287 | async def test_contour_detect_async_section_filter( |
| 288 | tmp_path: pathlib.Path, |
| 289 | db_session: AsyncSession, |
| 290 | ) -> None: |
| 291 | """Section name is propagated into the result when specified.""" |
| 292 | _init_muse_repo(tmp_path) |
| 293 | _commit_ref(tmp_path) |
| 294 | |
| 295 | result = await _contour_detect_async( |
| 296 | root=tmp_path, session=db_session, commit=None, track=None, section="verse" |
| 297 | ) |
| 298 | assert result["section"] == "verse" |
| 299 | |
| 300 | |
| 301 | @pytest.mark.anyio |
| 302 | async def test_contour_classifies_arch_shape( |
| 303 | tmp_path: pathlib.Path, |
| 304 | db_session: AsyncSession, |
| 305 | ) -> None: |
| 306 | """Stub contour returns 'arch' as the default shape label.""" |
| 307 | _init_muse_repo(tmp_path) |
| 308 | _commit_ref(tmp_path) |
| 309 | |
| 310 | result = await _contour_detect_async( |
| 311 | root=tmp_path, session=db_session, commit=None, track=None, section=None |
| 312 | ) |
| 313 | assert result["shape"] == "arch" |
| 314 | |
| 315 | |
| 316 | # --------------------------------------------------------------------------- |
| 317 | # Async core — _contour_compare_async |
| 318 | # --------------------------------------------------------------------------- |
| 319 | |
| 320 | |
| 321 | @pytest.mark.anyio |
| 322 | async def test_contour_compare_detects_angularity_change( |
| 323 | tmp_path: pathlib.Path, |
| 324 | db_session: AsyncSession, |
| 325 | ) -> None: |
| 326 | """_contour_compare_async returns a ContourCompareResult with delta fields.""" |
| 327 | _init_muse_repo(tmp_path) |
| 328 | _commit_ref(tmp_path) |
| 329 | |
| 330 | result = await _contour_compare_async( |
| 331 | root=tmp_path, |
| 332 | session=db_session, |
| 333 | commit_a=None, |
| 334 | commit_b="HEAD~10", |
| 335 | track=None, |
| 336 | section=None, |
| 337 | ) |
| 338 | |
| 339 | assert "commit_a" in result |
| 340 | assert "commit_b" in result |
| 341 | assert "angularity_delta" in result |
| 342 | assert "tessitura_delta" in result |
| 343 | assert isinstance(result["shape_changed"], bool) |
| 344 | assert result["commit_b"]["commit"] == "HEAD~10" |
| 345 | |
| 346 | |
| 347 | @pytest.mark.anyio |
| 348 | async def test_contour_compare_shape_changed_flag( |
| 349 | tmp_path: pathlib.Path, |
| 350 | db_session: AsyncSession, |
| 351 | ) -> None: |
| 352 | """shape_changed is False when both sides return the same stub shape.""" |
| 353 | _init_muse_repo(tmp_path) |
| 354 | _commit_ref(tmp_path) |
| 355 | |
| 356 | result = await _contour_compare_async( |
| 357 | root=tmp_path, |
| 358 | session=db_session, |
| 359 | commit_a=None, |
| 360 | commit_b="some-ref", |
| 361 | track=None, |
| 362 | section=None, |
| 363 | ) |
| 364 | assert result["shape_changed"] is False |
| 365 | |
| 366 | |
| 367 | # --------------------------------------------------------------------------- |
| 368 | # Async core — _contour_history_async |
| 369 | # --------------------------------------------------------------------------- |
| 370 | |
| 371 | |
| 372 | @pytest.mark.anyio |
| 373 | async def test_contour_history_returns_evolution( |
| 374 | tmp_path: pathlib.Path, |
| 375 | db_session: AsyncSession, |
| 376 | ) -> None: |
| 377 | """_contour_history_async returns a non-empty list of ContourResult entries.""" |
| 378 | _init_muse_repo(tmp_path) |
| 379 | _commit_ref(tmp_path) |
| 380 | |
| 381 | entries = await _contour_history_async( |
| 382 | root=tmp_path, session=db_session, track=None, section=None |
| 383 | ) |
| 384 | |
| 385 | assert len(entries) >= 1 |
| 386 | for entry in entries: |
| 387 | assert entry["shape"] in VALID_SHAPES |
| 388 | assert isinstance(entry["tessitura"], int) |
| 389 | |
| 390 | |
| 391 | @pytest.mark.anyio |
| 392 | async def test_contour_history_with_track( |
| 393 | tmp_path: pathlib.Path, |
| 394 | db_session: AsyncSession, |
| 395 | ) -> None: |
| 396 | """_contour_history_async propagates track into all returned entries.""" |
| 397 | _init_muse_repo(tmp_path) |
| 398 | _commit_ref(tmp_path) |
| 399 | |
| 400 | entries = await _contour_history_async( |
| 401 | root=tmp_path, session=db_session, track="lead", section=None |
| 402 | ) |
| 403 | |
| 404 | for entry in entries: |
| 405 | assert entry["track"] == "lead" |
| 406 | |
| 407 | |
| 408 | # --------------------------------------------------------------------------- |
| 409 | # CLI integration — CliRunner |
| 410 | # --------------------------------------------------------------------------- |
| 411 | |
| 412 | |
| 413 | def test_cli_contour_outside_repo_exits_2(tmp_path: pathlib.Path) -> None: |
| 414 | """``muse contour`` exits 2 when invoked outside a Muse repository.""" |
| 415 | prev = os.getcwd() |
| 416 | try: |
| 417 | os.chdir(tmp_path) |
| 418 | result = runner.invoke(cli, ["contour"], catch_exceptions=False) |
| 419 | finally: |
| 420 | os.chdir(prev) |
| 421 | |
| 422 | assert result.exit_code == int(ExitCode.REPO_NOT_FOUND) |
| 423 | assert "not a muse repository" in result.output.lower() |
| 424 | |
| 425 | |
| 426 | def test_cli_contour_help_lists_flags() -> None: |
| 427 | """``muse contour --help`` shows all documented flags.""" |
| 428 | result = runner.invoke(cli, ["contour", "--help"]) |
| 429 | assert result.exit_code == 0 |
| 430 | for flag in ("--track", "--section", "--compare", "--history", "--shape", "--json"): |
| 431 | assert flag in result.output, f"Flag '{flag}' not found in help output" |
| 432 | |
| 433 | |
| 434 | def test_cli_contour_appears_in_muse_help() -> None: |
| 435 | """``muse --help`` lists the contour subcommand.""" |
| 436 | result = runner.invoke(cli, ["--help"]) |
| 437 | assert result.exit_code == 0 |
| 438 | assert "contour" in result.output |