cgcardona / muse public
test_muse_dynamics.py python
530 lines 15.2 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for ``muse dynamics`` — CLI interface, flag parsing, and stub output format.
2
3 All CLI-level tests use ``typer.testing.CliRunner`` against the full ``muse``
4 app so that argument parsing, flag handling, and exit codes are exercised end-to-end.
5
6 Async core tests call ``_dynamics_async`` directly with an in-memory SQLite session
7 (defined as a local fixture — the stub does not query the DB, so the session is
8 injected only to satisfy the signature contract).
9 """
10 from __future__ import annotations
11
12 import json
13 import os
14 import pathlib
15 import uuid
16 from collections.abc import AsyncGenerator
17 from unittest.mock import AsyncMock
18
19 import pytest
20 import pytest_asyncio
21 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
22 from sqlalchemy.pool import StaticPool
23 from typer.testing import CliRunner
24
25 from maestro.db.database import Base
26 import maestro.muse_cli.models # noqa: F401 — registers MuseCli* with Base.metadata
27 from maestro.muse_cli.app import cli
28 from maestro.muse_cli.commands.dynamics import (
29 TrackDynamics,
30 _ARC_LABELS,
31 _VALID_ARCS,
32 _dynamics_async,
33 _render_json,
34 _render_table,
35 _stub_profiles,
36 )
37 from maestro.muse_cli.errors import ExitCode
38
39 runner = CliRunner()
40
41 # ---------------------------------------------------------------------------
42 # Fixtures
43 # ---------------------------------------------------------------------------
44
45
46 def _init_muse_repo(root: pathlib.Path, branch: str = "main") -> str:
47 """Create a minimal .muse/ layout with one empty commit ref."""
48 rid = str(uuid.uuid4())
49 muse = root / ".muse"
50 (muse / "refs" / "heads").mkdir(parents=True)
51 (muse / "repo.json").write_text(json.dumps({"repo_id": rid, "schema_version": "1"}))
52 (muse / "HEAD").write_text(f"refs/heads/{branch}")
53 # Leave the ref file empty (no commits yet) — dynamics handles this gracefully.
54 (muse / "refs" / "heads" / branch).write_text("")
55 return rid
56
57
58 def _commit_ref(root: pathlib.Path, branch: str = "main") -> None:
59 """Write a fake commit ID into the branch ref so HEAD is non-empty."""
60 muse = root / ".muse"
61 (muse / "refs" / "heads" / branch).write_text("a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2")
62
63
64 @pytest_asyncio.fixture
65 async def db_session() -> AsyncGenerator[AsyncSession, None]:
66 """In-memory SQLite session (stub dynamics does not actually query it)."""
67 engine = create_async_engine(
68 "sqlite+aiosqlite:///:memory:",
69 connect_args={"check_same_thread": False},
70 poolclass=StaticPool,
71 )
72 async with engine.begin() as conn:
73 await conn.run_sync(Base.metadata.create_all)
74 factory = async_sessionmaker(bind=engine, expire_on_commit=False)
75 async with factory() as session:
76 yield session
77 async with engine.begin() as conn:
78 await conn.run_sync(Base.metadata.drop_all)
79 await engine.dispose()
80
81
82 # ---------------------------------------------------------------------------
83 # Unit — stub data
84 # ---------------------------------------------------------------------------
85
86
87 def test_stub_profiles_returns_four_tracks() -> None:
88 """Stub produces exactly four tracks."""
89 profiles = _stub_profiles()
90 assert len(profiles) == 4
91
92
93 def test_stub_profiles_have_valid_arcs() -> None:
94 """Every stub track has a valid arc label."""
95 for p in _stub_profiles():
96 assert p.arc in _VALID_ARCS, f"Unexpected arc '{p.arc}' for track '{p.name}'"
97
98
99 def test_stub_profiles_velocity_constraints() -> None:
100 """Stub profiles have sensible velocity values (0–127 MIDI range)."""
101 for p in _stub_profiles():
102 assert 0 <= p.avg_velocity <= 127, f"{p.name}: avg_velocity out of range"
103 assert 0 <= p.peak_velocity <= 127, f"{p.name}: peak_velocity out of range"
104 assert p.peak_velocity >= p.avg_velocity, f"{p.name}: peak < avg"
105 assert p.velocity_range > 0, f"{p.name}: velocity_range must be positive"
106
107
108 def test_track_dynamics_to_dict() -> None:
109 """TrackDynamics.to_dict returns the expected keys and types."""
110 td = TrackDynamics(
111 name="drums", avg_velocity=88, peak_velocity=110, velocity_range=42, arc="terraced"
112 )
113 d = td.to_dict()
114 assert d["track"] == "drums"
115 assert d["avg_velocity"] == 88
116 assert d["peak_velocity"] == 110
117 assert d["velocity_range"] == 42
118 assert d["arc"] == "terraced"
119
120
121 def test_arc_labels_constant_matches_valid_arcs() -> None:
122 """_ARC_LABELS tuple and _VALID_ARCS frozenset are in sync."""
123 assert set(_ARC_LABELS) == _VALID_ARCS
124
125
126 # ---------------------------------------------------------------------------
127 # Unit — renderers (capsys)
128 # ---------------------------------------------------------------------------
129
130
131 def test_render_table_outputs_header(capsys: pytest.CaptureFixture[str]) -> None:
132 """_render_table includes commit ref and column headers."""
133 profiles = _stub_profiles()
134 _render_table(profiles, commit_ref="a1b2c3d4", branch="main")
135 out = capsys.readouterr().out
136 assert "Dynamic profile" in out
137 assert "a1b2c3d4" in out
138 assert "Track" in out
139 assert "Avg Vel" in out
140 assert "Arc" in out
141
142
143 def test_render_table_shows_all_tracks(capsys: pytest.CaptureFixture[str]) -> None:
144 """_render_table emits one row per profile."""
145 profiles = _stub_profiles()
146 _render_table(profiles, commit_ref="a1b2c3d4", branch="main")
147 out = capsys.readouterr().out
148 for p in profiles:
149 assert p.name in out
150 assert p.arc in out
151
152
153 def test_render_json_is_valid(capsys: pytest.CaptureFixture[str]) -> None:
154 """_render_json emits parseable JSON with the expected top-level keys."""
155 profiles = _stub_profiles()
156 _render_json(profiles, commit_ref="a1b2c3d4", branch="main")
157 raw = capsys.readouterr().out
158 payload = json.loads(raw)
159 assert payload["commit"] == "a1b2c3d4"
160 assert payload["branch"] == "main"
161 assert isinstance(payload["tracks"], list)
162 assert len(payload["tracks"]) == len(profiles)
163 for entry in payload["tracks"]:
164 assert "track" in entry
165 assert "avg_velocity" in entry
166 assert "arc" in entry
167
168
169 # ---------------------------------------------------------------------------
170 # Async core — _dynamics_async
171 # ---------------------------------------------------------------------------
172
173
174 @pytest.mark.anyio
175 async def test_dynamics_async_default_output(
176 tmp_path: pathlib.Path,
177 db_session: AsyncSession,
178 capsys: pytest.CaptureFixture[str],
179 ) -> None:
180 """_dynamics_async with no filters shows all stub tracks."""
181 _init_muse_repo(tmp_path)
182 _commit_ref(tmp_path)
183
184 await _dynamics_async(
185 root=tmp_path,
186 session=db_session,
187 commit=None,
188 track=None,
189 section=None,
190 compare=None,
191 history=False,
192 peak=False,
193 range_flag=False,
194 arc=False,
195 as_json=False,
196 )
197
198 out = capsys.readouterr().out
199 assert "Dynamic profile" in out
200 assert "drums" in out
201 assert "bass" in out
202 assert "keys" in out
203 assert "lead" in out
204
205
206 @pytest.mark.anyio
207 async def test_dynamics_async_json_mode(
208 tmp_path: pathlib.Path,
209 db_session: AsyncSession,
210 capsys: pytest.CaptureFixture[str],
211 ) -> None:
212 """_dynamics_async --json emits valid JSON with all four tracks."""
213 _init_muse_repo(tmp_path)
214 _commit_ref(tmp_path)
215
216 await _dynamics_async(
217 root=tmp_path,
218 session=db_session,
219 commit=None,
220 track=None,
221 section=None,
222 compare=None,
223 history=False,
224 peak=False,
225 range_flag=False,
226 arc=False,
227 as_json=True,
228 )
229
230 raw = capsys.readouterr().out
231 payload = json.loads(raw)
232 assert len(payload["tracks"]) == 4
233
234
235 @pytest.mark.anyio
236 async def test_dynamics_async_track_filter(
237 tmp_path: pathlib.Path,
238 db_session: AsyncSession,
239 capsys: pytest.CaptureFixture[str],
240 ) -> None:
241 """--track filters output to prefix-matched tracks only."""
242 _init_muse_repo(tmp_path)
243 _commit_ref(tmp_path)
244
245 await _dynamics_async(
246 root=tmp_path,
247 session=db_session,
248 commit=None,
249 track="drum",
250 section=None,
251 compare=None,
252 history=False,
253 peak=False,
254 range_flag=False,
255 arc=False,
256 as_json=False,
257 )
258
259 out = capsys.readouterr().out
260 assert "drums" in out
261 assert "bass" not in out
262 assert "keys" not in out
263
264
265 @pytest.mark.anyio
266 async def test_dynamics_async_arc_filter_valid(
267 tmp_path: pathlib.Path,
268 db_session: AsyncSession,
269 capsys: pytest.CaptureFixture[str],
270 ) -> None:
271 """--arc with a valid arc label filters tracks to that arc only."""
272 _init_muse_repo(tmp_path)
273 _commit_ref(tmp_path)
274
275 await _dynamics_async(
276 root=tmp_path,
277 session=db_session,
278 commit=None,
279 track="flat",
280 section=None,
281 compare=None,
282 history=False,
283 peak=False,
284 range_flag=False,
285 arc=True,
286 as_json=False,
287 )
288
289 out = capsys.readouterr().out
290 # "bass" has arc "flat"
291 assert "bass" in out
292 # "drums" has arc "terraced" — should be absent
293 assert "drums" not in out
294
295
296 @pytest.mark.anyio
297 async def test_dynamics_async_arc_filter_invalid_exits(
298 tmp_path: pathlib.Path,
299 db_session: AsyncSession,
300 ) -> None:
301 """--arc with an invalid arc label exits with USER_ERROR."""
302 _init_muse_repo(tmp_path)
303 _commit_ref(tmp_path)
304
305 import typer
306
307 with pytest.raises(typer.Exit) as exc_info:
308 await _dynamics_async(
309 root=tmp_path,
310 session=db_session,
311 commit=None,
312 track="notanarc",
313 section=None,
314 compare=None,
315 history=False,
316 peak=False,
317 range_flag=False,
318 arc=True,
319 as_json=False,
320 )
321 assert exc_info.value.exit_code == int(ExitCode.USER_ERROR)
322
323
324 @pytest.mark.anyio
325 async def test_dynamics_async_no_commits_exits_success(
326 tmp_path: pathlib.Path,
327 db_session: AsyncSession,
328 capsys: pytest.CaptureFixture[str],
329 ) -> None:
330 """With no commits and no explicit commit arg, exits 0 with informative message."""
331 _init_muse_repo(tmp_path)
332 # No _commit_ref call — branch ref is empty.
333
334 import typer
335
336 with pytest.raises(typer.Exit) as exc_info:
337 await _dynamics_async(
338 root=tmp_path,
339 session=db_session,
340 commit=None,
341 track=None,
342 section=None,
343 compare=None,
344 history=False,
345 peak=False,
346 range_flag=False,
347 arc=False,
348 as_json=False,
349 )
350 assert exc_info.value.exit_code == int(ExitCode.SUCCESS)
351 out = capsys.readouterr().out
352 assert "No commits yet" in out
353
354
355 @pytest.mark.anyio
356 async def test_dynamics_async_peak_filter(
357 tmp_path: pathlib.Path,
358 db_session: AsyncSession,
359 capsys: pytest.CaptureFixture[str],
360 ) -> None:
361 """--peak removes tracks whose peak is at or below the branch average peak."""
362 _init_muse_repo(tmp_path)
363 _commit_ref(tmp_path)
364
365 await _dynamics_async(
366 root=tmp_path,
367 session=db_session,
368 commit=None,
369 track=None,
370 section=None,
371 compare=None,
372 history=False,
373 peak=True,
374 range_flag=False,
375 arc=False,
376 as_json=False,
377 )
378
379 out = capsys.readouterr().out
380 # At least some tracks should appear.
381 assert "Dynamic profile" in out
382
383
384 @pytest.mark.anyio
385 async def test_dynamics_async_range_sort_json(
386 tmp_path: pathlib.Path,
387 db_session: AsyncSession,
388 capsys: pytest.CaptureFixture[str],
389 ) -> None:
390 """--range sorts tracks by velocity_range descending."""
391 _init_muse_repo(tmp_path)
392 _commit_ref(tmp_path)
393
394 await _dynamics_async(
395 root=tmp_path,
396 session=db_session,
397 commit=None,
398 track=None,
399 section=None,
400 compare=None,
401 history=False,
402 peak=False,
403 range_flag=True,
404 arc=False,
405 as_json=True,
406 )
407
408 raw = capsys.readouterr().out
409 payload = json.loads(raw)
410 ranges = [t["velocity_range"] for t in payload["tracks"]]
411 assert ranges == sorted(ranges, reverse=True)
412
413
414 @pytest.mark.anyio
415 async def test_dynamics_async_explicit_commit_ref(
416 tmp_path: pathlib.Path,
417 db_session: AsyncSession,
418 capsys: pytest.CaptureFixture[str],
419 ) -> None:
420 """An explicit commit ref appears in the output header."""
421 _init_muse_repo(tmp_path)
422 _commit_ref(tmp_path)
423
424 await _dynamics_async(
425 root=tmp_path,
426 session=db_session,
427 commit="deadbeef",
428 track=None,
429 section=None,
430 compare=None,
431 history=False,
432 peak=False,
433 range_flag=False,
434 arc=False,
435 as_json=False,
436 )
437
438 out = capsys.readouterr().out
439 assert "deadbeef" in out
440
441
442 @pytest.mark.anyio
443 async def test_dynamics_async_history_flag_warns(
444 tmp_path: pathlib.Path,
445 db_session: AsyncSession,
446 capsys: pytest.CaptureFixture[str],
447 ) -> None:
448 """--history emits a stub boundary warning but still renders the table."""
449 _init_muse_repo(tmp_path)
450 _commit_ref(tmp_path)
451
452 await _dynamics_async(
453 root=tmp_path,
454 session=db_session,
455 commit=None,
456 track=None,
457 section=None,
458 compare=None,
459 history=True,
460 peak=False,
461 range_flag=False,
462 arc=False,
463 as_json=False,
464 )
465
466 out = capsys.readouterr().out
467 assert "--history" in out
468 assert "Dynamic profile" in out
469
470
471 @pytest.mark.anyio
472 async def test_dynamics_async_compare_flag_warns(
473 tmp_path: pathlib.Path,
474 db_session: AsyncSession,
475 capsys: pytest.CaptureFixture[str],
476 ) -> None:
477 """--compare emits a stub boundary warning but still renders the table."""
478 _init_muse_repo(tmp_path)
479 _commit_ref(tmp_path)
480
481 await _dynamics_async(
482 root=tmp_path,
483 session=db_session,
484 commit=None,
485 track=None,
486 section=None,
487 compare="abc123",
488 history=False,
489 peak=False,
490 range_flag=False,
491 arc=False,
492 as_json=False,
493 )
494
495 out = capsys.readouterr().out
496 assert "--compare" in out
497 assert "Dynamic profile" in out
498
499
500 # ---------------------------------------------------------------------------
501 # CLI integration — CliRunner
502 # ---------------------------------------------------------------------------
503
504
505 def test_cli_dynamics_outside_repo_exits_2(tmp_path: pathlib.Path) -> None:
506 """``muse dynamics`` exits 2 when invoked outside a Muse repository."""
507 prev = os.getcwd()
508 try:
509 os.chdir(tmp_path)
510 result = runner.invoke(cli, ["dynamics"], catch_exceptions=False)
511 finally:
512 os.chdir(prev)
513
514 assert result.exit_code == int(ExitCode.REPO_NOT_FOUND)
515 assert "not a muse repository" in result.output.lower()
516
517
518 def test_cli_dynamics_help_lists_flags(tmp_path: pathlib.Path) -> None:
519 """``muse dynamics --help`` shows all documented flags."""
520 result = runner.invoke(cli, ["dynamics", "--help"])
521 assert result.exit_code == 0
522 for flag in ("--track", "--section", "--compare", "--history", "--peak", "--range", "--arc", "--json"):
523 assert flag in result.output, f"Flag '{flag}' not found in help output"
524
525
526 def test_cli_dynamics_appears_in_muse_help() -> None:
527 """``muse --help`` lists the dynamics subcommand."""
528 result = runner.invoke(cli, ["--help"])
529 assert result.exit_code == 0
530 assert "dynamics" in result.output