cgcardona / muse public
meter.py python
604 lines 22.7 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """muse meter — read or set the time signature of a Muse CLI commit.
2
3 Commands
4 --------
5
6 Read the stored time signature for HEAD::
7
8 muse meter
9
10 Read the stored time signature for a specific commit::
11
12 muse meter <commit-sha>
13
14 Set the time signature on HEAD::
15
16 muse meter --set 7/8
17
18 Auto-detect from MIDI files in the current working tree::
19
20 muse meter --detect
21
22 Show meter annotations across the commit history::
23
24 muse meter --history
25
26 Detect tracks with conflicting (polyrhythmic) time signatures::
27
28 muse meter --polyrhythm
29
30 Time Signature Format
31 ---------------------
32 ``<numerator>/<denominator>`` where denominator is a power of 2 (1, 2, 4, 8, 16, …).
33 Examples: ``4/4``, ``3/4``, ``7/8``, ``5/4``, ``12/8``, ``6/8``.
34
35 Storage
36 -------
37 The time signature is stored as the ``meter`` key inside the
38 ``metadata`` JSON blob on the ``muse_cli_commits`` row. No new
39 columns are added; the blob is extensible for future annotations (tempo,
40 key, etc.).
41
42 MIDI Detection
43 --------------
44 ``--detect`` scans ``.mid`` / ``.midi`` files in ``muse-work/`` for MIDI
45 time-signature meta events (``0xFF 0x58``). The first event found across
46 all files wins. If no event is present the time signature is reported as
47 unknown (``?``).
48 """
49 from __future__ import annotations
50
51 import asyncio
52 import dataclasses
53 import json
54 import logging
55 import pathlib
56 import re
57
58 import typer
59 from sqlalchemy.ext.asyncio import AsyncSession
60
61 from maestro.muse_cli._repo import require_repo
62 from maestro.muse_cli.db import (
63 get_commit_extra_metadata,
64 open_session,
65 set_commit_extra_metadata_key,
66 )
67 from maestro.muse_cli.errors import ExitCode
68 from maestro.muse_cli.models import MuseCliCommit
69
70 logger = logging.getLogger(__name__)
71
72 app = typer.Typer()
73
74 # ──────────────────────────────────────────────────────────────────────────────
75 # Domain types (registered in docs/reference/type_contracts.md)
76 # ──────────────────────────────────────────────────────────────────────────────
77
78 _METADATA_KEY = "meter"
79
80 _TIME_SIG_RE = re.compile(r"^(\d+)/(\d+)$")
81
82
83 @dataclasses.dataclass(frozen=True)
84 class MuseMeterReadResult:
85 """Result of reading a time-signature annotation from a single commit.
86
87 Attributes:
88 commit_id: Full 64-char sha256 commit identifier.
89 time_signature: Time signature string (e.g. ``"4/4"``), or
90 ``None`` when no annotation is stored.
91 """
92
93 commit_id: str
94 time_signature: str | None
95
96
97 @dataclasses.dataclass(frozen=True)
98 class MuseMeterHistoryEntry:
99 """A single entry in the per-commit meter history.
100
101 Attributes:
102 commit_id: Full 64-char sha256 commit identifier.
103 time_signature: Stored time signature, or ``None`` if not annotated.
104 message: Commit message.
105 """
106
107 commit_id: str
108 time_signature: str | None
109 message: str
110
111
112 @dataclasses.dataclass(frozen=True)
113 class MusePolyrhythmResult:
114 """Result of polyrhythm detection across MIDI files in the working tree.
115
116 Attributes:
117 commit_id: Commit that was inspected (HEAD by default).
118 signatures_by_file: Mapping of relative file path to detected time
119 signature string (``"?"`` if undetectable).
120 is_polyrhythmic: ``True`` when two or more distinct, known time
121 signatures are present simultaneously.
122 """
123
124 commit_id: str
125 signatures_by_file: dict[str, str]
126 is_polyrhythmic: bool
127
128
129 # ──────────────────────────────────────────────────────────────────────────────
130 # Time-signature validation
131 # ──────────────────────────────────────────────────────────────────────────────
132
133
134 def validate_time_signature(raw: str) -> str:
135 """Parse and validate a time signature string like ``"4/4"`` or ``"7/8"``.
136
137 The denominator must be a power of two (1 through 128). Returns the
138 canonical string (stripped) on success; raises ``ValueError`` on failure.
139 """
140 raw = raw.strip()
141 m = _TIME_SIG_RE.match(raw)
142 if not m:
143 raise ValueError(
144 f"Invalid time signature {raw!r}. "
145 "Expected <numerator>/<denominator>, e.g. '4/4' or '7/8'."
146 )
147 numerator = int(m.group(1))
148 denominator = int(m.group(2))
149 if numerator < 1:
150 raise ValueError(f"Numerator must be ≥ 1, got {numerator}.")
151 if denominator < 1 or (denominator & (denominator - 1)) != 0:
152 raise ValueError(
153 f"Denominator must be a power of 2 (1, 2, 4, 8, 16, …), got {denominator}."
154 )
155 return raw
156
157
158 # ──────────────────────────────────────────────────────────────────────────────
159 # MIDI time-signature detection
160 # ──────────────────────────────────────────────────────────────────────────────
161
162
163 def detect_midi_time_signature(midi_bytes: bytes) -> str | None:
164 """Scan raw MIDI bytes for the first time-signature meta event (0xFF 0x58).
165
166 Returns a ``"numerator/denominator"`` string or ``None`` when no event
167 is found.
168
169 MIDI time-signature meta event layout (after the variable-length delta):
170 0xFF — meta event marker
171 0x58 — time signature type
172 0x04 — data length (always 4)
173 nn — numerator
174 dd — denominator exponent (denominator = 2^dd)
175 cc — MIDI clocks per metronome tick
176 bb — number of 32nd notes per 24 MIDI clocks
177 """
178 i = 0
179 n = len(midi_bytes)
180 # Skip the 14-byte MIDI file header (MThd chunk) if present.
181 if midi_bytes[:4] == b"MThd":
182 i = 14 # MThd + 4 (length) + 6 (header data) + first MTrk lead
183
184 while i < n - 5:
185 if midi_bytes[i] == 0xFF and midi_bytes[i + 1] == 0x58:
186 length_byte = midi_bytes[i + 2]
187 if length_byte == 4 and i + 6 < n:
188 numerator = midi_bytes[i + 3]
189 denominator_exp = midi_bytes[i + 4]
190 denominator = 2**denominator_exp
191 if numerator >= 1 and denominator >= 1:
192 return f"{numerator}/{denominator}"
193 i += 1
194 return None
195
196
197 def scan_workdir_for_time_signatures(
198 workdir: pathlib.Path,
199 ) -> dict[str, str]:
200 """Scan all MIDI files under *workdir* for time-signature meta events.
201
202 Returns a dict mapping each MIDI file's path (relative to *workdir*)
203 to its detected time signature, or ``"?"`` when none is found.
204 Only ``.mid`` and ``.midi`` extensions are scanned.
205 """
206 results: dict[str, str] = {}
207 if not workdir.exists():
208 return results
209 for midi_path in sorted(workdir.rglob("*.mid")) + sorted(workdir.rglob("*.midi")):
210 try:
211 midi_bytes = midi_path.read_bytes()
212 except OSError:
213 continue
214 sig = detect_midi_time_signature(midi_bytes)
215 rel = midi_path.relative_to(workdir).as_posix()
216 results[rel] = sig if sig is not None else "?"
217 return results
218
219
220 # ──────────────────────────────────────────────────────────────────────────────
221 # Repo HEAD resolution
222 # ──────────────────────────────────────────────────────────────────────────────
223
224
225 def _resolve_head_commit_id(root: pathlib.Path) -> str | None:
226 """Return the HEAD commit ID from the ``.muse/`` ref files, or ``None``."""
227 muse_dir = root / ".muse"
228 head_ref = (muse_dir / "HEAD").read_text().strip()
229 ref_path = muse_dir / pathlib.Path(head_ref)
230 if not ref_path.exists():
231 return None
232 raw = ref_path.read_text().strip()
233 return raw or None
234
235
236 # ──────────────────────────────────────────────────────────────────────────────
237 # Async core functions (fully injectable for tests)
238 # ──────────────────────────────────────────────────────────────────────────────
239
240
241 async def _resolve_commit_id(
242 session: AsyncSession,
243 root: pathlib.Path,
244 commit_ref: str | None,
245 ) -> str:
246 """Resolve *commit_ref* to a full 64-char commit ID.
247
248 If *commit_ref* is ``None`` or ``"HEAD"``, resolves from the branch ref.
249 Otherwise treats *commit_ref* as a (possibly abbreviated) commit ID and
250 fetches the full ID from the DB.
251
252 Raises ``typer.Exit(USER_ERROR)`` when the ref cannot be resolved.
253 """
254 if commit_ref is None or commit_ref.upper() == "HEAD":
255 cid = _resolve_head_commit_id(root)
256 if cid is None:
257 typer.echo("❌ No commits on this branch yet.")
258 raise typer.Exit(code=ExitCode.USER_ERROR)
259 return cid
260
261 # Abbreviated or full commit ID — look up in DB.
262 if len(commit_ref) == 64:
263 row = await session.get(MuseCliCommit, commit_ref)
264 if row is None:
265 typer.echo(f"❌ Commit {commit_ref[:8]} not found.")
266 raise typer.Exit(code=ExitCode.USER_ERROR)
267 return row.commit_id
268
269 # Prefix search for abbreviated IDs.
270 from sqlalchemy.future import select
271
272 result = await session.execute(
273 select(MuseCliCommit.commit_id).where(
274 MuseCliCommit.commit_id.startswith(commit_ref)
275 )
276 )
277 rows = result.scalars().all()
278 if not rows:
279 typer.echo(f"❌ Commit {commit_ref!r} not found.")
280 raise typer.Exit(code=ExitCode.USER_ERROR)
281 if len(rows) > 1:
282 typer.echo(
283 f"❌ Ambiguous commit prefix {commit_ref!r} — matches {len(rows)} commits."
284 )
285 raise typer.Exit(code=ExitCode.USER_ERROR)
286 return rows[0]
287
288
289 async def _meter_read_async(
290 *,
291 session: AsyncSession,
292 root: pathlib.Path,
293 commit_ref: str | None,
294 ) -> MuseMeterReadResult:
295 """Read the stored time signature for a commit.
296
297 Returns a :class:`MuseMeterReadResult`. Does not write to the DB.
298 """
299 commit_id = await _resolve_commit_id(session, root, commit_ref)
300 metadata = await get_commit_extra_metadata(session, commit_id)
301 time_sig: str | None = None
302 if metadata:
303 raw = metadata.get(_METADATA_KEY)
304 if isinstance(raw, str):
305 time_sig = raw
306 return MuseMeterReadResult(commit_id=commit_id, time_signature=time_sig)
307
308
309 async def _meter_set_async(
310 *,
311 session: AsyncSession,
312 root: pathlib.Path,
313 commit_ref: str | None,
314 time_signature: str,
315 ) -> str:
316 """Store *time_signature* as the meter annotation on *commit_ref*.
317
318 Returns the full commit ID on success. Raises ``typer.Exit`` on error.
319 """
320 commit_id = await _resolve_commit_id(session, root, commit_ref)
321 ok = await set_commit_extra_metadata_key(
322 session, commit_id=commit_id, key=_METADATA_KEY, value=time_signature
323 )
324 if not ok:
325 typer.echo(f"❌ Failed to update commit {commit_id[:8]}.")
326 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
327 return commit_id
328
329
330 async def _meter_history_async(
331 *,
332 session: AsyncSession,
333 root: pathlib.Path,
334 ) -> list[MuseMeterHistoryEntry]:
335 """Walk the commit chain from HEAD and collect meter annotations.
336
337 Returns a list of :class:`MuseMeterHistoryEntry` newest-first.
338 """
339 head_id = _resolve_head_commit_id(root)
340 if head_id is None:
341 return []
342
343 entries: list[MuseMeterHistoryEntry] = []
344 current_id: str | None = head_id
345 while current_id:
346 commit = await session.get(MuseCliCommit, current_id)
347 if commit is None:
348 break
349 metadata = commit.commit_metadata or {}
350 raw = metadata.get(_METADATA_KEY) if isinstance(metadata, dict) else None
351 time_sig: str | None = raw if isinstance(raw, str) else None
352 entries.append(
353 MuseMeterHistoryEntry(
354 commit_id=commit.commit_id,
355 time_signature=time_sig,
356 message=commit.message,
357 )
358 )
359 current_id = commit.parent_commit_id
360 return entries
361
362
363 async def _meter_polyrhythm_async(
364 *,
365 session: AsyncSession,
366 root: pathlib.Path,
367 commit_ref: str | None,
368 ) -> MusePolyrhythmResult:
369 """Detect polyrhythm by scanning MIDI files in the current working tree.
370
371 Because Muse CLI stores content hashes (not raw bytes) in snapshots, live
372 MIDI scanning is performed against the files currently in ``muse-work/``.
373 The *commit_ref* is used only to record which commit the result pertains to.
374 """
375 commit_id = await _resolve_commit_id(session, root, commit_ref)
376 workdir = root / "muse-work"
377 sigs = scan_workdir_for_time_signatures(workdir)
378 known = {s for s in sigs.values() if s != "?"}
379 is_poly = len(known) > 1
380 return MusePolyrhythmResult(
381 commit_id=commit_id,
382 signatures_by_file=sigs,
383 is_polyrhythmic=is_poly,
384 )
385
386
387 # ──────────────────────────────────────────────────────────────────────────────
388 # Output renderers
389 # ──────────────────────────────────────────────────────────────────────────────
390
391
392 def _render_read(result: MuseMeterReadResult) -> None:
393 """Print time signature for a single commit."""
394 sig = result.time_signature or "(not set)"
395 typer.echo(f"commit {result.commit_id[:8]}")
396 typer.echo(f"meter {sig}")
397
398
399 class _UnsetType:
400 """Sentinel type for 'not yet seen' in history rendering."""
401
402
403 _UNSET = _UnsetType()
404
405
406 def _render_history(entries: list[MuseMeterHistoryEntry]) -> None:
407 """Print meter history newest-first, highlighting changes."""
408 if not entries:
409 typer.echo("No commits on this branch yet.")
410 return
411 prev_sig: str | _UnsetType = _UNSET
412 for entry in entries:
413 sig = entry.time_signature or "(not set)"
414 changed = sig != prev_sig
415 marker = " ← changed" if changed and prev_sig is not _UNSET else ""
416 typer.echo(f"{entry.commit_id[:8]} {sig:<12} {entry.message[:50]}{marker}")
417 prev_sig = sig
418
419
420 def _render_polyrhythm(result: MusePolyrhythmResult) -> None:
421 """Print polyrhythm detection results."""
422 if not result.signatures_by_file:
423 typer.echo("No MIDI files found in muse-work/.")
424 return
425 if result.is_polyrhythmic:
426 typer.echo(
427 "⚠️ Polyrhythm detected — multiple time signatures in this commit:"
428 )
429 else:
430 typer.echo("✅ No polyrhythm — all MIDI files share the same time signature.")
431 typer.echo("")
432 for path, sig in sorted(result.signatures_by_file.items()):
433 typer.echo(f" {sig:<12} {path}")
434
435
436 # ──────────────────────────────────────────────────────────────────────────────
437 # Typer command
438 # ──────────────────────────────────────────────────────────────────────────────
439
440
441 @app.callback(invoke_without_command=True)
442 def meter(
443 ctx: typer.Context,
444 commit: str | None = typer.Argument(
445 None,
446 help="Target commit (full or abbreviated SHA, or 'HEAD'). Defaults to HEAD.",
447 metavar="COMMIT",
448 ),
449 set_sig: str | None = typer.Option(
450 None,
451 "--set",
452 help="Set the time signature, e.g. '4/4' or '7/8'.",
453 metavar="TIME_SIG",
454 ),
455 detect: bool = typer.Option(
456 False,
457 "--detect",
458 help="Auto-detect time signature from MIDI meta events in muse-work/.",
459 ),
460 history: bool = typer.Option(
461 False,
462 "--history",
463 help="Show meter annotations across all commits on the current branch.",
464 ),
465 polyrhythm: bool = typer.Option(
466 False,
467 "--polyrhythm",
468 help="Detect tracks with conflicting time signatures in muse-work/.",
469 ),
470 ) -> None:
471 """Read or set the time signature annotation for a commit."""
472 root = require_repo()
473
474 # ── Mutual exclusion ─────────────────────────────────────────────────────
475 flags_given = sum([set_sig is not None, detect, history, polyrhythm])
476 if flags_given > 1:
477 typer.echo(
478 "❌ Only one of --set, --detect, --history, --polyrhythm may be used at a time."
479 )
480 raise typer.Exit(code=ExitCode.USER_ERROR)
481
482 # ── --history (no commit arg needed) ─────────────────────────────────────
483 if history:
484
485 async def _run_history() -> None:
486 async with open_session() as session:
487 entries = await _meter_history_async(session=session, root=root)
488 _render_history(entries)
489
490 try:
491 asyncio.run(_run_history())
492 except typer.Exit:
493 raise
494 except Exception as exc:
495 typer.echo(f"❌ muse meter --history failed: {exc}")
496 logger.error("❌ muse meter history error: %s", exc, exc_info=True)
497 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
498 return
499
500 # ── --polyrhythm ─────────────────────────────────────────────────────────
501 if polyrhythm:
502
503 async def _run_polyrhythm() -> None:
504 async with open_session() as session:
505 result = await _meter_polyrhythm_async(
506 session=session, root=root, commit_ref=commit
507 )
508 _render_polyrhythm(result)
509
510 try:
511 asyncio.run(_run_polyrhythm())
512 except typer.Exit:
513 raise
514 except Exception as exc:
515 typer.echo(f"❌ muse meter --polyrhythm failed: {exc}")
516 logger.error("❌ muse meter polyrhythm error: %s", exc, exc_info=True)
517 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
518 return
519
520 # ── --set <time-sig> ─────────────────────────────────────────────────────
521 if set_sig is not None:
522 try:
523 canonical = validate_time_signature(set_sig)
524 except ValueError as exc:
525 typer.echo(f"❌ {exc}")
526 raise typer.Exit(code=ExitCode.USER_ERROR)
527
528 async def _run_set() -> None:
529 async with open_session() as session:
530 commit_id = await _meter_set_async(
531 session=session,
532 root=root,
533 commit_ref=commit,
534 time_signature=canonical,
535 )
536 typer.echo(f"✅ Set meter={canonical!r} on commit {commit_id[:8]}")
537
538 try:
539 asyncio.run(_run_set())
540 except typer.Exit:
541 raise
542 except Exception as exc:
543 typer.echo(f"❌ muse meter --set failed: {exc}")
544 logger.error("❌ muse meter set error: %s", exc, exc_info=True)
545 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
546 return
547
548 # ── --detect ─────────────────────────────────────────────────────────────
549 if detect:
550 workdir = root / "muse-work"
551 sigs = scan_workdir_for_time_signatures(workdir)
552 if not sigs:
553 typer.echo("⚠️ No MIDI files found in muse-work/.")
554 raise typer.Exit(code=ExitCode.SUCCESS)
555
556 # Find the most common known signature.
557 known = [s for s in sigs.values() if s != "?"]
558 detected: str | None = None
559 if known:
560 from collections import Counter
561 detected = Counter(known).most_common(1)[0][0]
562 typer.echo(f"✅ Detected time signature: {detected}")
563 else:
564 typer.echo("⚠️ No MIDI time-signature meta events found in muse-work/ files.")
565 raise typer.Exit(code=ExitCode.SUCCESS)
566
567 # Auto-store the detected value on the target commit.
568 async def _run_detect() -> None:
569 async with open_session() as session:
570 assert detected is not None
571 commit_id = await _meter_set_async(
572 session=session,
573 root=root,
574 commit_ref=commit,
575 time_signature=detected,
576 )
577 typer.echo(f"✅ Stored meter={detected!r} on commit {commit_id[:8]}")
578
579 try:
580 asyncio.run(_run_detect())
581 except typer.Exit:
582 raise
583 except Exception as exc:
584 typer.echo(f"❌ muse meter --detect failed: {exc}")
585 logger.error("❌ muse meter detect error: %s", exc, exc_info=True)
586 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)
587 return
588
589 # ── Default: read ─────────────────────────────────────────────────────────
590 async def _run_read() -> None:
591 async with open_session() as session:
592 result = await _meter_read_async(
593 session=session, root=root, commit_ref=commit
594 )
595 _render_read(result)
596
597 try:
598 asyncio.run(_run_read())
599 except typer.Exit:
600 raise
601 except Exception as exc:
602 typer.echo(f"❌ muse meter failed: {exc}")
603 logger.error("❌ muse meter error: %s", exc, exc_info=True)
604 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)