test_tempo.py
python
| 1 | """Tests for ``muse tempo``. |
| 2 | |
| 3 | All async tests call the async core functions directly with an in-memory |
| 4 | SQLite session and a ``tmp_path`` repo root — no real Postgres or running |
| 5 | process required. Commits are seeded via ``_commit_async`` so tempo and |
| 6 | commit commands are tested as an integrated pair. |
| 7 | """ |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import json |
| 11 | import pathlib |
| 12 | import struct |
| 13 | import uuid |
| 14 | |
| 15 | import pytest |
| 16 | import typer |
| 17 | from sqlalchemy.ext.asyncio import AsyncSession |
| 18 | |
| 19 | from maestro.muse_cli.commands.commit import _commit_async |
| 20 | from maestro.muse_cli.commands.tempo import ( |
| 21 | _tempo_history_async, |
| 22 | _tempo_read_async, |
| 23 | _tempo_set_async, |
| 24 | ) |
| 25 | from maestro.muse_cli.db import set_commit_tempo_bpm |
| 26 | from maestro.muse_cli.errors import ExitCode |
| 27 | from maestro.services.muse_tempo import ( |
| 28 | MuseTempoHistoryEntry, |
| 29 | MuseTempoResult, |
| 30 | build_tempo_history, |
| 31 | detect_all_tempos_from_midi, |
| 32 | extract_bpm_from_midi, |
| 33 | ) |
| 34 | |
| 35 | |
| 36 | # --------------------------------------------------------------------------- |
| 37 | # Repo + workdir helpers (mirrors test_log.py pattern) |
| 38 | # --------------------------------------------------------------------------- |
| 39 | |
| 40 | |
| 41 | def _init_muse_repo(root: pathlib.Path, repo_id: str | None = None) -> str: |
| 42 | """Initialise a minimal .muse/ repo structure for tests.""" |
| 43 | rid = repo_id or str(uuid.uuid4()) |
| 44 | muse = root / ".muse" |
| 45 | (muse / "refs" / "heads").mkdir(parents=True) |
| 46 | (muse / "repo.json").write_text( |
| 47 | json.dumps({"repo_id": rid, "schema_version": "1"}) |
| 48 | ) |
| 49 | (muse / "HEAD").write_text("refs/heads/main") |
| 50 | (muse / "refs" / "heads" / "main").write_text("") |
| 51 | return rid |
| 52 | |
| 53 | |
| 54 | def _write_workdir(root: pathlib.Path, files: dict[str, bytes]) -> None: |
| 55 | workdir = root / "muse-work" |
| 56 | workdir.mkdir(exist_ok=True) |
| 57 | for name, content in files.items(): |
| 58 | (workdir / name).write_bytes(content) |
| 59 | |
| 60 | |
| 61 | async def _make_commit( |
| 62 | root: pathlib.Path, |
| 63 | session: AsyncSession, |
| 64 | message: str, |
| 65 | file_seed: int = 0, |
| 66 | ) -> str: |
| 67 | _write_workdir(root, {f"track_{file_seed}.mid": f"MIDI-{file_seed}".encode()}) |
| 68 | return await _commit_async(message=message, root=root, session=session) |
| 69 | |
| 70 | |
| 71 | def _build_midi_with_tempo(uspb: int) -> bytes: |
| 72 | """Return a minimal MIDI file byte string with one Set Tempo event. |
| 73 | |
| 74 | The tempo event is embedded in a Type-0 MIDI file header followed by |
| 75 | a single track containing only the FF 51 03 event. |
| 76 | """ |
| 77 | # Set Tempo event: delta_time(0) + FF 51 03 + 3-byte big-endian uspb |
| 78 | tempo_bytes = bytes([(uspb >> 16) & 0xFF, (uspb >> 8) & 0xFF, uspb & 0xFF]) |
| 79 | track_event = b"\x00\xFF\x51\x03" + tempo_bytes + b"\x00\xFF\x2F\x00" # + end-of-track |
| 80 | |
| 81 | # MThd header: type=0, ntrks=1, division=480 |
| 82 | header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480) |
| 83 | # MTrk chunk |
| 84 | track = b"MTrk" + struct.pack(">I", len(track_event)) + track_event |
| 85 | return header + track |
| 86 | |
| 87 | |
| 88 | # --------------------------------------------------------------------------- |
| 89 | # Unit tests: extract_bpm_from_midi |
| 90 | # --------------------------------------------------------------------------- |
| 91 | |
| 92 | |
| 93 | def test_extract_bpm_from_midi_returns_correct_bpm() -> None: |
| 94 | """120 BPM = 500000 µs/beat → extract_bpm_from_midi returns 120.0.""" |
| 95 | midi_data = _build_midi_with_tempo(500_000) |
| 96 | bpm = extract_bpm_from_midi(midi_data) |
| 97 | assert bpm is not None |
| 98 | assert abs(bpm - 120.0) < 0.1 |
| 99 | |
| 100 | |
| 101 | def test_extract_bpm_from_midi_140_bpm() -> None: |
| 102 | """140 BPM = 428571 µs/beat.""" |
| 103 | uspb = int(60_000_000 / 140) |
| 104 | midi_data = _build_midi_with_tempo(uspb) |
| 105 | bpm = extract_bpm_from_midi(midi_data) |
| 106 | assert bpm is not None |
| 107 | assert abs(bpm - 140.0) < 0.5 |
| 108 | |
| 109 | |
| 110 | def test_extract_bpm_from_midi_no_header_returns_none() -> None: |
| 111 | """Non-MIDI bytes return None.""" |
| 112 | assert extract_bpm_from_midi(b"not midi data") is None |
| 113 | |
| 114 | |
| 115 | def test_extract_bpm_from_midi_no_tempo_event_returns_none() -> None: |
| 116 | """A valid MIDI file with no Set Tempo event returns None.""" |
| 117 | # Just the MThd header with an empty track — no tempo event |
| 118 | header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480) |
| 119 | eot = b"\x00\xFF\x2F\x00" |
| 120 | track = b"MTrk" + struct.pack(">I", len(eot)) + eot |
| 121 | assert extract_bpm_from_midi(header + track) is None |
| 122 | |
| 123 | |
| 124 | def test_extract_bpm_from_midi_empty_bytes_returns_none() -> None: |
| 125 | """Empty bytes → None (no crash).""" |
| 126 | assert extract_bpm_from_midi(b"") is None |
| 127 | |
| 128 | |
| 129 | def test_detect_all_tempos_returns_multiple_events() -> None: |
| 130 | """A file with two tempo events returns both BPMs.""" |
| 131 | def _tempo_event(uspb: int) -> bytes: |
| 132 | b3 = bytes([(uspb >> 16) & 0xFF, (uspb >> 8) & 0xFF, uspb & 0xFF]) |
| 133 | return b"\x00\xFF\x51\x03" + b3 |
| 134 | |
| 135 | events = _tempo_event(500_000) + _tempo_event(428_571) + b"\x00\xFF\x2F\x00" |
| 136 | header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480) |
| 137 | track = b"MTrk" + struct.pack(">I", len(events)) + events |
| 138 | tempos = detect_all_tempos_from_midi(header + track) |
| 139 | assert len(tempos) == 2 |
| 140 | assert abs(tempos[0] - 120.0) < 0.1 |
| 141 | assert abs(tempos[1] - 140.0) < 0.5 |
| 142 | |
| 143 | |
| 144 | # --------------------------------------------------------------------------- |
| 145 | # Integration tests: _tempo_read_async |
| 146 | # --------------------------------------------------------------------------- |
| 147 | |
| 148 | |
| 149 | @pytest.mark.anyio |
| 150 | async def test_tempo_read_no_annotation_no_midi_shows_dash( |
| 151 | tmp_path: pathlib.Path, |
| 152 | muse_cli_db_session: AsyncSession, |
| 153 | capsys: pytest.CaptureFixture[str], |
| 154 | ) -> None: |
| 155 | """Reading tempo on a commit with no annotation shows '--'.""" |
| 156 | _init_muse_repo(tmp_path) |
| 157 | await _make_commit(tmp_path, muse_cli_db_session, "take 1") |
| 158 | |
| 159 | capsys.readouterr() |
| 160 | result = await _tempo_read_async( |
| 161 | root=tmp_path, |
| 162 | session=muse_cli_db_session, |
| 163 | commit_ref=None, |
| 164 | as_json=False, |
| 165 | ) |
| 166 | out = capsys.readouterr().out |
| 167 | assert result.tempo_bpm is None |
| 168 | assert result.detected_bpm is None |
| 169 | assert "--" in out |
| 170 | |
| 171 | |
| 172 | @pytest.mark.anyio |
| 173 | async def test_tempo_read_shows_annotated_bpm( |
| 174 | tmp_path: pathlib.Path, |
| 175 | muse_cli_db_session: AsyncSession, |
| 176 | capsys: pytest.CaptureFixture[str], |
| 177 | ) -> None: |
| 178 | """After --set 128, reading tempo shows 128.0 BPM (annotated).""" |
| 179 | _init_muse_repo(tmp_path) |
| 180 | commit_id = await _make_commit(tmp_path, muse_cli_db_session, "boom bap take 1") |
| 181 | await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 128.0) |
| 182 | |
| 183 | capsys.readouterr() |
| 184 | result = await _tempo_read_async( |
| 185 | root=tmp_path, |
| 186 | session=muse_cli_db_session, |
| 187 | commit_ref=None, |
| 188 | as_json=False, |
| 189 | ) |
| 190 | out = capsys.readouterr().out |
| 191 | assert result.tempo_bpm == 128.0 |
| 192 | assert "128.0" in out |
| 193 | assert "annotated" in out |
| 194 | |
| 195 | |
| 196 | @pytest.mark.anyio |
| 197 | async def test_tempo_read_midi_detection( |
| 198 | tmp_path: pathlib.Path, |
| 199 | muse_cli_db_session: AsyncSession, |
| 200 | capsys: pytest.CaptureFixture[str], |
| 201 | ) -> None: |
| 202 | """A MIDI file with a Set Tempo event in muse-work/ is auto-detected.""" |
| 203 | _init_muse_repo(tmp_path) |
| 204 | midi_data = _build_midi_with_tempo(500_000) # 120 BPM |
| 205 | _write_workdir(tmp_path, {"groove.mid": midi_data}) |
| 206 | await _commit_async(message="groove", root=tmp_path, session=muse_cli_db_session) |
| 207 | |
| 208 | capsys.readouterr() |
| 209 | result = await _tempo_read_async( |
| 210 | root=tmp_path, |
| 211 | session=muse_cli_db_session, |
| 212 | commit_ref=None, |
| 213 | as_json=False, |
| 214 | ) |
| 215 | out = capsys.readouterr().out |
| 216 | assert result.detected_bpm is not None |
| 217 | assert abs(result.detected_bpm - 120.0) < 0.5 |
| 218 | assert "120" in out |
| 219 | assert "detected" in out |
| 220 | |
| 221 | |
| 222 | @pytest.mark.anyio |
| 223 | async def test_tempo_read_json_output( |
| 224 | tmp_path: pathlib.Path, |
| 225 | muse_cli_db_session: AsyncSession, |
| 226 | capsys: pytest.CaptureFixture[str], |
| 227 | ) -> None: |
| 228 | """``--json`` flag produces valid JSON with all expected keys.""" |
| 229 | _init_muse_repo(tmp_path) |
| 230 | commit_id = await _make_commit(tmp_path, muse_cli_db_session, "samba") |
| 231 | await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 100.0) |
| 232 | |
| 233 | capsys.readouterr() |
| 234 | await _tempo_read_async( |
| 235 | root=tmp_path, |
| 236 | session=muse_cli_db_session, |
| 237 | commit_ref=None, |
| 238 | as_json=True, |
| 239 | ) |
| 240 | raw = capsys.readouterr().out |
| 241 | data = json.loads(raw) |
| 242 | assert data["tempo_bpm"] == 100.0 |
| 243 | assert "commit_id" in data |
| 244 | assert "effective_bpm" in data |
| 245 | |
| 246 | |
| 247 | @pytest.mark.anyio |
| 248 | async def test_tempo_read_abbreviated_commit_ref( |
| 249 | tmp_path: pathlib.Path, |
| 250 | muse_cli_db_session: AsyncSession, |
| 251 | capsys: pytest.CaptureFixture[str], |
| 252 | ) -> None: |
| 253 | """An abbreviated commit SHA resolves to the correct commit.""" |
| 254 | _init_muse_repo(tmp_path) |
| 255 | commit_id = await _make_commit(tmp_path, muse_cli_db_session, "reggae") |
| 256 | await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 76.0) |
| 257 | |
| 258 | capsys.readouterr() |
| 259 | result = await _tempo_read_async( |
| 260 | root=tmp_path, |
| 261 | session=muse_cli_db_session, |
| 262 | commit_ref=commit_id[:8], |
| 263 | as_json=False, |
| 264 | ) |
| 265 | assert result.commit_id == commit_id |
| 266 | assert result.tempo_bpm == 76.0 |
| 267 | |
| 268 | |
| 269 | @pytest.mark.anyio |
| 270 | async def test_tempo_read_invalid_ref_exits_user_error( |
| 271 | tmp_path: pathlib.Path, |
| 272 | muse_cli_db_session: AsyncSession, |
| 273 | ) -> None: |
| 274 | """An unknown commit ref exits with USER_ERROR.""" |
| 275 | _init_muse_repo(tmp_path) |
| 276 | await _make_commit(tmp_path, muse_cli_db_session, "bossa") |
| 277 | |
| 278 | with pytest.raises(typer.Exit) as exc_info: |
| 279 | await _tempo_read_async( |
| 280 | root=tmp_path, |
| 281 | session=muse_cli_db_session, |
| 282 | commit_ref="deadbeef", |
| 283 | as_json=False, |
| 284 | ) |
| 285 | assert exc_info.value.exit_code == ExitCode.USER_ERROR |
| 286 | |
| 287 | |
| 288 | # --------------------------------------------------------------------------- |
| 289 | # Integration tests: _tempo_set_async |
| 290 | # --------------------------------------------------------------------------- |
| 291 | |
| 292 | |
| 293 | @pytest.mark.anyio |
| 294 | async def test_tempo_set_stores_bpm_in_metadata( |
| 295 | tmp_path: pathlib.Path, |
| 296 | muse_cli_db_session: AsyncSession, |
| 297 | capsys: pytest.CaptureFixture[str], |
| 298 | ) -> None: |
| 299 | """``--set 128`` writes tempo_bpm into commit.commit_metadata.""" |
| 300 | _init_muse_repo(tmp_path) |
| 301 | commit_id = await _make_commit(tmp_path, muse_cli_db_session, "hip hop beat") |
| 302 | |
| 303 | capsys.readouterr() |
| 304 | await _tempo_set_async( |
| 305 | root=tmp_path, |
| 306 | session=muse_cli_db_session, |
| 307 | commit_ref=None, |
| 308 | bpm=128.0, |
| 309 | ) |
| 310 | out = capsys.readouterr().out |
| 311 | assert "128.0" in out |
| 312 | |
| 313 | # Verify it was actually stored |
| 314 | result = await _tempo_read_async( |
| 315 | root=tmp_path, |
| 316 | session=muse_cli_db_session, |
| 317 | commit_ref=commit_id, |
| 318 | as_json=False, |
| 319 | ) |
| 320 | assert result.tempo_bpm == 128.0 |
| 321 | |
| 322 | |
| 323 | @pytest.mark.anyio |
| 324 | async def test_tempo_set_preserves_existing_metadata( |
| 325 | tmp_path: pathlib.Path, |
| 326 | muse_cli_db_session: AsyncSession, |
| 327 | ) -> None: |
| 328 | """Setting tempo does not clobber other metadata keys.""" |
| 329 | from maestro.muse_cli.models import MuseCliCommit |
| 330 | |
| 331 | _init_muse_repo(tmp_path) |
| 332 | commit_id = await _make_commit(tmp_path, muse_cli_db_session, "funk") |
| 333 | |
| 334 | # Pre-load some other metadata key |
| 335 | commit = await muse_cli_db_session.get(MuseCliCommit, commit_id) |
| 336 | assert commit is not None |
| 337 | commit.commit_metadata = {"some_other_key": "value"} |
| 338 | muse_cli_db_session.add(commit) |
| 339 | await muse_cli_db_session.flush() |
| 340 | |
| 341 | await _tempo_set_async( |
| 342 | root=tmp_path, |
| 343 | session=muse_cli_db_session, |
| 344 | commit_ref=None, |
| 345 | bpm=95.0, |
| 346 | ) |
| 347 | |
| 348 | updated = await muse_cli_db_session.get(MuseCliCommit, commit_id) |
| 349 | assert updated is not None |
| 350 | assert updated.commit_metadata is not None |
| 351 | assert updated.commit_metadata["tempo_bpm"] == 95.0 |
| 352 | assert updated.commit_metadata["some_other_key"] == "value" |
| 353 | |
| 354 | |
| 355 | # --------------------------------------------------------------------------- |
| 356 | # Integration tests: _tempo_history_async |
| 357 | # --------------------------------------------------------------------------- |
| 358 | |
| 359 | |
| 360 | @pytest.mark.anyio |
| 361 | async def test_tempo_history_shows_all_commits( |
| 362 | tmp_path: pathlib.Path, |
| 363 | muse_cli_db_session: AsyncSession, |
| 364 | capsys: pytest.CaptureFixture[str], |
| 365 | ) -> None: |
| 366 | """``--history`` shows one row per commit in the chain.""" |
| 367 | _init_muse_repo(tmp_path) |
| 368 | for i in range(3): |
| 369 | await _make_commit(tmp_path, muse_cli_db_session, f"take {i + 1}", file_seed=i) |
| 370 | |
| 371 | capsys.readouterr() |
| 372 | history = await _tempo_history_async( |
| 373 | root=tmp_path, |
| 374 | session=muse_cli_db_session, |
| 375 | commit_ref=None, |
| 376 | as_json=False, |
| 377 | ) |
| 378 | assert len(history) == 3 |
| 379 | out = capsys.readouterr().out |
| 380 | assert "take 1" in out |
| 381 | assert "take 2" in out |
| 382 | assert "take 3" in out |
| 383 | |
| 384 | |
| 385 | @pytest.mark.anyio |
| 386 | async def test_tempo_history_delta_computed_correctly( |
| 387 | tmp_path: pathlib.Path, |
| 388 | muse_cli_db_session: AsyncSession, |
| 389 | ) -> None: |
| 390 | """Delta BPM between annotated commits is computed correctly.""" |
| 391 | _init_muse_repo(tmp_path) |
| 392 | c1 = await _make_commit(tmp_path, muse_cli_db_session, "slow", file_seed=0) |
| 393 | c2 = await _make_commit(tmp_path, muse_cli_db_session, "fast", file_seed=1) |
| 394 | |
| 395 | await set_commit_tempo_bpm(muse_cli_db_session, c1, 80.0) |
| 396 | await set_commit_tempo_bpm(muse_cli_db_session, c2, 140.0) |
| 397 | |
| 398 | history = await _tempo_history_async( |
| 399 | root=tmp_path, |
| 400 | session=muse_cli_db_session, |
| 401 | commit_ref=None, |
| 402 | as_json=False, |
| 403 | ) |
| 404 | # Newest-first: fast (c2) then slow (c1) |
| 405 | assert history[0].commit_id == c2 |
| 406 | assert history[0].effective_bpm == 140.0 |
| 407 | assert history[0].delta_bpm == pytest.approx(60.0) |
| 408 | |
| 409 | assert history[1].commit_id == c1 |
| 410 | assert history[1].effective_bpm == 80.0 |
| 411 | assert history[1].delta_bpm is None # oldest — no ancestor |
| 412 | |
| 413 | |
| 414 | @pytest.mark.anyio |
| 415 | async def test_tempo_history_json_output( |
| 416 | tmp_path: pathlib.Path, |
| 417 | muse_cli_db_session: AsyncSession, |
| 418 | capsys: pytest.CaptureFixture[str], |
| 419 | ) -> None: |
| 420 | """``--history --json`` produces valid JSON list.""" |
| 421 | _init_muse_repo(tmp_path) |
| 422 | await _make_commit(tmp_path, muse_cli_db_session, "jazz") |
| 423 | |
| 424 | capsys.readouterr() |
| 425 | await _tempo_history_async( |
| 426 | root=tmp_path, |
| 427 | session=muse_cli_db_session, |
| 428 | commit_ref=None, |
| 429 | as_json=True, |
| 430 | ) |
| 431 | raw = capsys.readouterr().out |
| 432 | data = json.loads(raw) |
| 433 | assert isinstance(data, list) |
| 434 | assert len(data) == 1 |
| 435 | assert "commit_id" in data[0] |
| 436 | assert "effective_bpm" in data[0] |
| 437 | |
| 438 | |
| 439 | # --------------------------------------------------------------------------- |
| 440 | # Unit tests: build_tempo_history |
| 441 | # --------------------------------------------------------------------------- |
| 442 | |
| 443 | |
| 444 | def test_build_tempo_history_oldest_has_no_delta() -> None: |
| 445 | """The oldest commit in the chain has delta_bpm = None.""" |
| 446 | from maestro.muse_cli.models import MuseCliCommit, MuseCliSnapshot |
| 447 | import datetime |
| 448 | |
| 449 | def _fake_commit(cid: str, parent: str | None, bpm: float | None) -> MuseCliCommit: |
| 450 | c = MuseCliCommit( |
| 451 | commit_id=cid, |
| 452 | repo_id="r", |
| 453 | branch="main", |
| 454 | parent_commit_id=parent, |
| 455 | snapshot_id="s", |
| 456 | message=f"take {cid[:4]}", |
| 457 | author="test", |
| 458 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 459 | ) |
| 460 | c.commit_metadata = {"tempo_bpm": bpm} if bpm is not None else None |
| 461 | return c |
| 462 | |
| 463 | # Newest first: c3 → c2 → c1 |
| 464 | commits = [ |
| 465 | _fake_commit("ccc", "bbb", 140.0), |
| 466 | _fake_commit("bbb", "aaa", 120.0), |
| 467 | _fake_commit("aaa", None, 80.0), |
| 468 | ] |
| 469 | history = build_tempo_history(commits) |
| 470 | # Still newest-first after build_tempo_history |
| 471 | assert history[0].commit_id == "ccc" |
| 472 | assert history[0].delta_bpm == pytest.approx(20.0) |
| 473 | assert history[1].commit_id == "bbb" |
| 474 | assert history[1].delta_bpm == pytest.approx(40.0) |
| 475 | assert history[2].commit_id == "aaa" |
| 476 | assert history[2].delta_bpm is None |