cgcardona / muse public
test_tempo.py python
476 lines 15.3 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for ``muse tempo``.
2
3 All async tests call the async core functions directly with an in-memory
4 SQLite session and a ``tmp_path`` repo root — no real Postgres or running
5 process required. Commits are seeded via ``_commit_async`` so tempo and
6 commit commands are tested as an integrated pair.
7 """
8 from __future__ import annotations
9
10 import json
11 import pathlib
12 import struct
13 import uuid
14
15 import pytest
16 import typer
17 from sqlalchemy.ext.asyncio import AsyncSession
18
19 from maestro.muse_cli.commands.commit import _commit_async
20 from maestro.muse_cli.commands.tempo import (
21 _tempo_history_async,
22 _tempo_read_async,
23 _tempo_set_async,
24 )
25 from maestro.muse_cli.db import set_commit_tempo_bpm
26 from maestro.muse_cli.errors import ExitCode
27 from maestro.services.muse_tempo import (
28 MuseTempoHistoryEntry,
29 MuseTempoResult,
30 build_tempo_history,
31 detect_all_tempos_from_midi,
32 extract_bpm_from_midi,
33 )
34
35
36 # ---------------------------------------------------------------------------
37 # Repo + workdir helpers (mirrors test_log.py pattern)
38 # ---------------------------------------------------------------------------
39
40
41 def _init_muse_repo(root: pathlib.Path, repo_id: str | None = None) -> str:
42 """Initialise a minimal .muse/ repo structure for tests."""
43 rid = repo_id or str(uuid.uuid4())
44 muse = root / ".muse"
45 (muse / "refs" / "heads").mkdir(parents=True)
46 (muse / "repo.json").write_text(
47 json.dumps({"repo_id": rid, "schema_version": "1"})
48 )
49 (muse / "HEAD").write_text("refs/heads/main")
50 (muse / "refs" / "heads" / "main").write_text("")
51 return rid
52
53
54 def _write_workdir(root: pathlib.Path, files: dict[str, bytes]) -> None:
55 workdir = root / "muse-work"
56 workdir.mkdir(exist_ok=True)
57 for name, content in files.items():
58 (workdir / name).write_bytes(content)
59
60
61 async def _make_commit(
62 root: pathlib.Path,
63 session: AsyncSession,
64 message: str,
65 file_seed: int = 0,
66 ) -> str:
67 _write_workdir(root, {f"track_{file_seed}.mid": f"MIDI-{file_seed}".encode()})
68 return await _commit_async(message=message, root=root, session=session)
69
70
71 def _build_midi_with_tempo(uspb: int) -> bytes:
72 """Return a minimal MIDI file byte string with one Set Tempo event.
73
74 The tempo event is embedded in a Type-0 MIDI file header followed by
75 a single track containing only the FF 51 03 event.
76 """
77 # Set Tempo event: delta_time(0) + FF 51 03 + 3-byte big-endian uspb
78 tempo_bytes = bytes([(uspb >> 16) & 0xFF, (uspb >> 8) & 0xFF, uspb & 0xFF])
79 track_event = b"\x00\xFF\x51\x03" + tempo_bytes + b"\x00\xFF\x2F\x00" # + end-of-track
80
81 # MThd header: type=0, ntrks=1, division=480
82 header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480)
83 # MTrk chunk
84 track = b"MTrk" + struct.pack(">I", len(track_event)) + track_event
85 return header + track
86
87
88 # ---------------------------------------------------------------------------
89 # Unit tests: extract_bpm_from_midi
90 # ---------------------------------------------------------------------------
91
92
93 def test_extract_bpm_from_midi_returns_correct_bpm() -> None:
94 """120 BPM = 500000 µs/beat → extract_bpm_from_midi returns 120.0."""
95 midi_data = _build_midi_with_tempo(500_000)
96 bpm = extract_bpm_from_midi(midi_data)
97 assert bpm is not None
98 assert abs(bpm - 120.0) < 0.1
99
100
101 def test_extract_bpm_from_midi_140_bpm() -> None:
102 """140 BPM = 428571 µs/beat."""
103 uspb = int(60_000_000 / 140)
104 midi_data = _build_midi_with_tempo(uspb)
105 bpm = extract_bpm_from_midi(midi_data)
106 assert bpm is not None
107 assert abs(bpm - 140.0) < 0.5
108
109
110 def test_extract_bpm_from_midi_no_header_returns_none() -> None:
111 """Non-MIDI bytes return None."""
112 assert extract_bpm_from_midi(b"not midi data") is None
113
114
115 def test_extract_bpm_from_midi_no_tempo_event_returns_none() -> None:
116 """A valid MIDI file with no Set Tempo event returns None."""
117 # Just the MThd header with an empty track — no tempo event
118 header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480)
119 eot = b"\x00\xFF\x2F\x00"
120 track = b"MTrk" + struct.pack(">I", len(eot)) + eot
121 assert extract_bpm_from_midi(header + track) is None
122
123
124 def test_extract_bpm_from_midi_empty_bytes_returns_none() -> None:
125 """Empty bytes → None (no crash)."""
126 assert extract_bpm_from_midi(b"") is None
127
128
129 def test_detect_all_tempos_returns_multiple_events() -> None:
130 """A file with two tempo events returns both BPMs."""
131 def _tempo_event(uspb: int) -> bytes:
132 b3 = bytes([(uspb >> 16) & 0xFF, (uspb >> 8) & 0xFF, uspb & 0xFF])
133 return b"\x00\xFF\x51\x03" + b3
134
135 events = _tempo_event(500_000) + _tempo_event(428_571) + b"\x00\xFF\x2F\x00"
136 header = b"MThd" + struct.pack(">IHHH", 6, 0, 1, 480)
137 track = b"MTrk" + struct.pack(">I", len(events)) + events
138 tempos = detect_all_tempos_from_midi(header + track)
139 assert len(tempos) == 2
140 assert abs(tempos[0] - 120.0) < 0.1
141 assert abs(tempos[1] - 140.0) < 0.5
142
143
144 # ---------------------------------------------------------------------------
145 # Integration tests: _tempo_read_async
146 # ---------------------------------------------------------------------------
147
148
149 @pytest.mark.anyio
150 async def test_tempo_read_no_annotation_no_midi_shows_dash(
151 tmp_path: pathlib.Path,
152 muse_cli_db_session: AsyncSession,
153 capsys: pytest.CaptureFixture[str],
154 ) -> None:
155 """Reading tempo on a commit with no annotation shows '--'."""
156 _init_muse_repo(tmp_path)
157 await _make_commit(tmp_path, muse_cli_db_session, "take 1")
158
159 capsys.readouterr()
160 result = await _tempo_read_async(
161 root=tmp_path,
162 session=muse_cli_db_session,
163 commit_ref=None,
164 as_json=False,
165 )
166 out = capsys.readouterr().out
167 assert result.tempo_bpm is None
168 assert result.detected_bpm is None
169 assert "--" in out
170
171
172 @pytest.mark.anyio
173 async def test_tempo_read_shows_annotated_bpm(
174 tmp_path: pathlib.Path,
175 muse_cli_db_session: AsyncSession,
176 capsys: pytest.CaptureFixture[str],
177 ) -> None:
178 """After --set 128, reading tempo shows 128.0 BPM (annotated)."""
179 _init_muse_repo(tmp_path)
180 commit_id = await _make_commit(tmp_path, muse_cli_db_session, "boom bap take 1")
181 await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 128.0)
182
183 capsys.readouterr()
184 result = await _tempo_read_async(
185 root=tmp_path,
186 session=muse_cli_db_session,
187 commit_ref=None,
188 as_json=False,
189 )
190 out = capsys.readouterr().out
191 assert result.tempo_bpm == 128.0
192 assert "128.0" in out
193 assert "annotated" in out
194
195
196 @pytest.mark.anyio
197 async def test_tempo_read_midi_detection(
198 tmp_path: pathlib.Path,
199 muse_cli_db_session: AsyncSession,
200 capsys: pytest.CaptureFixture[str],
201 ) -> None:
202 """A MIDI file with a Set Tempo event in muse-work/ is auto-detected."""
203 _init_muse_repo(tmp_path)
204 midi_data = _build_midi_with_tempo(500_000) # 120 BPM
205 _write_workdir(tmp_path, {"groove.mid": midi_data})
206 await _commit_async(message="groove", root=tmp_path, session=muse_cli_db_session)
207
208 capsys.readouterr()
209 result = await _tempo_read_async(
210 root=tmp_path,
211 session=muse_cli_db_session,
212 commit_ref=None,
213 as_json=False,
214 )
215 out = capsys.readouterr().out
216 assert result.detected_bpm is not None
217 assert abs(result.detected_bpm - 120.0) < 0.5
218 assert "120" in out
219 assert "detected" in out
220
221
222 @pytest.mark.anyio
223 async def test_tempo_read_json_output(
224 tmp_path: pathlib.Path,
225 muse_cli_db_session: AsyncSession,
226 capsys: pytest.CaptureFixture[str],
227 ) -> None:
228 """``--json`` flag produces valid JSON with all expected keys."""
229 _init_muse_repo(tmp_path)
230 commit_id = await _make_commit(tmp_path, muse_cli_db_session, "samba")
231 await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 100.0)
232
233 capsys.readouterr()
234 await _tempo_read_async(
235 root=tmp_path,
236 session=muse_cli_db_session,
237 commit_ref=None,
238 as_json=True,
239 )
240 raw = capsys.readouterr().out
241 data = json.loads(raw)
242 assert data["tempo_bpm"] == 100.0
243 assert "commit_id" in data
244 assert "effective_bpm" in data
245
246
247 @pytest.mark.anyio
248 async def test_tempo_read_abbreviated_commit_ref(
249 tmp_path: pathlib.Path,
250 muse_cli_db_session: AsyncSession,
251 capsys: pytest.CaptureFixture[str],
252 ) -> None:
253 """An abbreviated commit SHA resolves to the correct commit."""
254 _init_muse_repo(tmp_path)
255 commit_id = await _make_commit(tmp_path, muse_cli_db_session, "reggae")
256 await set_commit_tempo_bpm(muse_cli_db_session, commit_id, 76.0)
257
258 capsys.readouterr()
259 result = await _tempo_read_async(
260 root=tmp_path,
261 session=muse_cli_db_session,
262 commit_ref=commit_id[:8],
263 as_json=False,
264 )
265 assert result.commit_id == commit_id
266 assert result.tempo_bpm == 76.0
267
268
269 @pytest.mark.anyio
270 async def test_tempo_read_invalid_ref_exits_user_error(
271 tmp_path: pathlib.Path,
272 muse_cli_db_session: AsyncSession,
273 ) -> None:
274 """An unknown commit ref exits with USER_ERROR."""
275 _init_muse_repo(tmp_path)
276 await _make_commit(tmp_path, muse_cli_db_session, "bossa")
277
278 with pytest.raises(typer.Exit) as exc_info:
279 await _tempo_read_async(
280 root=tmp_path,
281 session=muse_cli_db_session,
282 commit_ref="deadbeef",
283 as_json=False,
284 )
285 assert exc_info.value.exit_code == ExitCode.USER_ERROR
286
287
288 # ---------------------------------------------------------------------------
289 # Integration tests: _tempo_set_async
290 # ---------------------------------------------------------------------------
291
292
293 @pytest.mark.anyio
294 async def test_tempo_set_stores_bpm_in_metadata(
295 tmp_path: pathlib.Path,
296 muse_cli_db_session: AsyncSession,
297 capsys: pytest.CaptureFixture[str],
298 ) -> None:
299 """``--set 128`` writes tempo_bpm into commit.commit_metadata."""
300 _init_muse_repo(tmp_path)
301 commit_id = await _make_commit(tmp_path, muse_cli_db_session, "hip hop beat")
302
303 capsys.readouterr()
304 await _tempo_set_async(
305 root=tmp_path,
306 session=muse_cli_db_session,
307 commit_ref=None,
308 bpm=128.0,
309 )
310 out = capsys.readouterr().out
311 assert "128.0" in out
312
313 # Verify it was actually stored
314 result = await _tempo_read_async(
315 root=tmp_path,
316 session=muse_cli_db_session,
317 commit_ref=commit_id,
318 as_json=False,
319 )
320 assert result.tempo_bpm == 128.0
321
322
323 @pytest.mark.anyio
324 async def test_tempo_set_preserves_existing_metadata(
325 tmp_path: pathlib.Path,
326 muse_cli_db_session: AsyncSession,
327 ) -> None:
328 """Setting tempo does not clobber other metadata keys."""
329 from maestro.muse_cli.models import MuseCliCommit
330
331 _init_muse_repo(tmp_path)
332 commit_id = await _make_commit(tmp_path, muse_cli_db_session, "funk")
333
334 # Pre-load some other metadata key
335 commit = await muse_cli_db_session.get(MuseCliCommit, commit_id)
336 assert commit is not None
337 commit.commit_metadata = {"some_other_key": "value"}
338 muse_cli_db_session.add(commit)
339 await muse_cli_db_session.flush()
340
341 await _tempo_set_async(
342 root=tmp_path,
343 session=muse_cli_db_session,
344 commit_ref=None,
345 bpm=95.0,
346 )
347
348 updated = await muse_cli_db_session.get(MuseCliCommit, commit_id)
349 assert updated is not None
350 assert updated.commit_metadata is not None
351 assert updated.commit_metadata["tempo_bpm"] == 95.0
352 assert updated.commit_metadata["some_other_key"] == "value"
353
354
355 # ---------------------------------------------------------------------------
356 # Integration tests: _tempo_history_async
357 # ---------------------------------------------------------------------------
358
359
360 @pytest.mark.anyio
361 async def test_tempo_history_shows_all_commits(
362 tmp_path: pathlib.Path,
363 muse_cli_db_session: AsyncSession,
364 capsys: pytest.CaptureFixture[str],
365 ) -> None:
366 """``--history`` shows one row per commit in the chain."""
367 _init_muse_repo(tmp_path)
368 for i in range(3):
369 await _make_commit(tmp_path, muse_cli_db_session, f"take {i + 1}", file_seed=i)
370
371 capsys.readouterr()
372 history = await _tempo_history_async(
373 root=tmp_path,
374 session=muse_cli_db_session,
375 commit_ref=None,
376 as_json=False,
377 )
378 assert len(history) == 3
379 out = capsys.readouterr().out
380 assert "take 1" in out
381 assert "take 2" in out
382 assert "take 3" in out
383
384
385 @pytest.mark.anyio
386 async def test_tempo_history_delta_computed_correctly(
387 tmp_path: pathlib.Path,
388 muse_cli_db_session: AsyncSession,
389 ) -> None:
390 """Delta BPM between annotated commits is computed correctly."""
391 _init_muse_repo(tmp_path)
392 c1 = await _make_commit(tmp_path, muse_cli_db_session, "slow", file_seed=0)
393 c2 = await _make_commit(tmp_path, muse_cli_db_session, "fast", file_seed=1)
394
395 await set_commit_tempo_bpm(muse_cli_db_session, c1, 80.0)
396 await set_commit_tempo_bpm(muse_cli_db_session, c2, 140.0)
397
398 history = await _tempo_history_async(
399 root=tmp_path,
400 session=muse_cli_db_session,
401 commit_ref=None,
402 as_json=False,
403 )
404 # Newest-first: fast (c2) then slow (c1)
405 assert history[0].commit_id == c2
406 assert history[0].effective_bpm == 140.0
407 assert history[0].delta_bpm == pytest.approx(60.0)
408
409 assert history[1].commit_id == c1
410 assert history[1].effective_bpm == 80.0
411 assert history[1].delta_bpm is None # oldest — no ancestor
412
413
414 @pytest.mark.anyio
415 async def test_tempo_history_json_output(
416 tmp_path: pathlib.Path,
417 muse_cli_db_session: AsyncSession,
418 capsys: pytest.CaptureFixture[str],
419 ) -> None:
420 """``--history --json`` produces valid JSON list."""
421 _init_muse_repo(tmp_path)
422 await _make_commit(tmp_path, muse_cli_db_session, "jazz")
423
424 capsys.readouterr()
425 await _tempo_history_async(
426 root=tmp_path,
427 session=muse_cli_db_session,
428 commit_ref=None,
429 as_json=True,
430 )
431 raw = capsys.readouterr().out
432 data = json.loads(raw)
433 assert isinstance(data, list)
434 assert len(data) == 1
435 assert "commit_id" in data[0]
436 assert "effective_bpm" in data[0]
437
438
439 # ---------------------------------------------------------------------------
440 # Unit tests: build_tempo_history
441 # ---------------------------------------------------------------------------
442
443
444 def test_build_tempo_history_oldest_has_no_delta() -> None:
445 """The oldest commit in the chain has delta_bpm = None."""
446 from maestro.muse_cli.models import MuseCliCommit, MuseCliSnapshot
447 import datetime
448
449 def _fake_commit(cid: str, parent: str | None, bpm: float | None) -> MuseCliCommit:
450 c = MuseCliCommit(
451 commit_id=cid,
452 repo_id="r",
453 branch="main",
454 parent_commit_id=parent,
455 snapshot_id="s",
456 message=f"take {cid[:4]}",
457 author="test",
458 committed_at=datetime.datetime.now(datetime.timezone.utc),
459 )
460 c.commit_metadata = {"tempo_bpm": bpm} if bpm is not None else None
461 return c
462
463 # Newest first: c3 → c2 → c1
464 commits = [
465 _fake_commit("ccc", "bbb", 140.0),
466 _fake_commit("bbb", "aaa", 120.0),
467 _fake_commit("aaa", None, 80.0),
468 ]
469 history = build_tempo_history(commits)
470 # Still newest-first after build_tempo_history
471 assert history[0].commit_id == "ccc"
472 assert history[0].delta_bpm == pytest.approx(20.0)
473 assert history[1].commit_id == "bbb"
474 assert history[1].delta_bpm == pytest.approx(40.0)
475 assert history[2].commit_id == "aaa"
476 assert history[2].delta_bpm is None