dynamics.py
python
| 1 | """muse dynamics — analyze the dynamic profile of a commit. |
| 2 | |
| 3 | Examines velocity data across tracks for a given commit (defaults to HEAD) |
| 4 | and reports per-track statistics with an arc classification. |
| 5 | |
| 6 | Output (default tabular):: |
| 7 | |
| 8 | Dynamic profile — commit a1b2c3d4 (HEAD -> main) |
| 9 | |
| 10 | Track Avg Vel Peak Range Arc |
| 11 | --------- ------- ---- ----- ----------- |
| 12 | drums 88 110 42 terraced |
| 13 | bass 72 85 28 flat |
| 14 | keys 64 95 56 crescendo |
| 15 | lead 79 105 38 swell |
| 16 | |
| 17 | Arc vocabulary |
| 18 | -------------- |
| 19 | - flat — velocity variance < 10; steady throughout |
| 20 | - crescendo — monotonically rising from start to end |
| 21 | - decrescendo — monotonically falling from start to end |
| 22 | - terraced — step-wise plateaus; sudden jumps between stable levels |
| 23 | - swell — rises then falls (arch shape) |
| 24 | |
| 25 | Flags |
| 26 | ----- |
| 27 | --track TEXT Filter to a single track (case-insensitive prefix match). |
| 28 | --section TEXT Restrict analysis to a named section/region. |
| 29 | --compare COMMIT Compare dynamics of <commit> against <COMMIT>. |
| 30 | --history Print dynamics for every commit in the branch history. |
| 31 | --peak Show only tracks whose peak velocity exceeds the branch average. |
| 32 | --range Sort output by velocity range (descending). |
| 33 | --arc Filter to only tracks matching the arc label given via --track. |
| 34 | --json Emit results as JSON instead of the ASCII table. |
| 35 | """ |
| 36 | from __future__ import annotations |
| 37 | |
| 38 | import asyncio |
| 39 | import json |
| 40 | import logging |
| 41 | import pathlib |
| 42 | from typing import Optional |
| 43 | |
| 44 | import typer |
| 45 | from sqlalchemy.ext.asyncio import AsyncSession |
| 46 | |
| 47 | from maestro.muse_cli._repo import require_repo |
| 48 | from maestro.muse_cli.db import open_session |
| 49 | from maestro.muse_cli.errors import ExitCode |
| 50 | |
| 51 | logger = logging.getLogger(__name__) |
| 52 | |
| 53 | app = typer.Typer() |
| 54 | |
| 55 | # --------------------------------------------------------------------------- |
| 56 | # Types |
| 57 | # --------------------------------------------------------------------------- |
| 58 | |
| 59 | ArcLabel = str # one of: flat | crescendo | decrescendo | terraced | swell |
| 60 | |
| 61 | _ARC_LABELS: tuple[str, ...] = ("flat", "crescendo", "decrescendo", "terraced", "swell") |
| 62 | |
| 63 | _VALID_ARCS: frozenset[str] = frozenset(_ARC_LABELS) |
| 64 | |
| 65 | |
| 66 | class TrackDynamics: |
| 67 | """Dynamic profile for a single track.""" |
| 68 | |
| 69 | __slots__ = ("name", "avg_velocity", "peak_velocity", "velocity_range", "arc") |
| 70 | |
| 71 | def __init__( |
| 72 | self, |
| 73 | name: str, |
| 74 | avg_velocity: int, |
| 75 | peak_velocity: int, |
| 76 | velocity_range: int, |
| 77 | arc: ArcLabel, |
| 78 | ) -> None: |
| 79 | self.name = name |
| 80 | self.avg_velocity = avg_velocity |
| 81 | self.peak_velocity = peak_velocity |
| 82 | self.velocity_range = velocity_range |
| 83 | self.arc = arc |
| 84 | |
| 85 | def to_dict(self) -> dict[str, object]: |
| 86 | return { |
| 87 | "track": self.name, |
| 88 | "avg_velocity": self.avg_velocity, |
| 89 | "peak_velocity": self.peak_velocity, |
| 90 | "velocity_range": self.velocity_range, |
| 91 | "arc": self.arc, |
| 92 | } |
| 93 | |
| 94 | |
| 95 | # --------------------------------------------------------------------------- |
| 96 | # Stub data — realistic placeholder until MIDI note data is queryable |
| 97 | # --------------------------------------------------------------------------- |
| 98 | |
| 99 | _STUB_TRACKS: list[tuple[str, int, int, int, ArcLabel]] = [ |
| 100 | ("drums", 88, 110, 42, "terraced"), |
| 101 | ("bass", 72, 85, 28, "flat"), |
| 102 | ("keys", 64, 95, 56, "crescendo"), |
| 103 | ("lead", 79, 105, 38, "swell"), |
| 104 | ] |
| 105 | |
| 106 | |
| 107 | def _stub_profiles() -> list[TrackDynamics]: |
| 108 | """Return stub TrackDynamics rows (placeholder for real DB query).""" |
| 109 | return [ |
| 110 | TrackDynamics( |
| 111 | name=name, |
| 112 | avg_velocity=avg, |
| 113 | peak_velocity=peak, |
| 114 | velocity_range=rng, |
| 115 | arc=arc, |
| 116 | ) |
| 117 | for name, avg, peak, rng, arc in _STUB_TRACKS |
| 118 | ] |
| 119 | |
| 120 | |
| 121 | # --------------------------------------------------------------------------- |
| 122 | # Rendering |
| 123 | # --------------------------------------------------------------------------- |
| 124 | |
| 125 | _COL_WIDTHS = (9, 7, 4, 5, 11) # track, avg, peak, range, arc |
| 126 | |
| 127 | |
| 128 | def _render_table(rows: list[TrackDynamics], commit_ref: str, branch: str) -> None: |
| 129 | """Print a human-readable ASCII table of dynamics.""" |
| 130 | head_label = f" (HEAD -> {branch})" if branch else "" |
| 131 | typer.echo(f"Dynamic profile — commit {commit_ref}{head_label}") |
| 132 | typer.echo("") |
| 133 | |
| 134 | # Header |
| 135 | header = ( |
| 136 | f"{'Track':<{_COL_WIDTHS[0]}} " |
| 137 | f"{'Avg Vel':>{_COL_WIDTHS[1]}} " |
| 138 | f"{'Peak':>{_COL_WIDTHS[2]}} " |
| 139 | f"{'Range':>{_COL_WIDTHS[3]}} " |
| 140 | f"{'Arc':<{_COL_WIDTHS[4]}}" |
| 141 | ) |
| 142 | sep = ( |
| 143 | f"{'-' * _COL_WIDTHS[0]} " |
| 144 | f"{'-' * _COL_WIDTHS[1]} " |
| 145 | f"{'-' * _COL_WIDTHS[2]} " |
| 146 | f"{'-' * _COL_WIDTHS[3]} " |
| 147 | f"{'-' * _COL_WIDTHS[4]}" |
| 148 | ) |
| 149 | typer.echo(header) |
| 150 | typer.echo(sep) |
| 151 | |
| 152 | for row in rows: |
| 153 | typer.echo( |
| 154 | f"{row.name:<{_COL_WIDTHS[0]}} " |
| 155 | f"{row.avg_velocity:>{_COL_WIDTHS[1]}} " |
| 156 | f"{row.peak_velocity:>{_COL_WIDTHS[2]}} " |
| 157 | f"{row.velocity_range:>{_COL_WIDTHS[3]}} " |
| 158 | f"{row.arc:<{_COL_WIDTHS[4]}}" |
| 159 | ) |
| 160 | typer.echo("") |
| 161 | |
| 162 | |
| 163 | def _render_json( |
| 164 | rows: list[TrackDynamics], |
| 165 | commit_ref: str, |
| 166 | branch: str, |
| 167 | ) -> None: |
| 168 | """Emit dynamics as a JSON object.""" |
| 169 | payload = { |
| 170 | "commit": commit_ref, |
| 171 | "branch": branch, |
| 172 | "tracks": [r.to_dict() for r in rows], |
| 173 | } |
| 174 | typer.echo(json.dumps(payload, indent=2)) |
| 175 | |
| 176 | |
| 177 | # --------------------------------------------------------------------------- |
| 178 | # Testable async core |
| 179 | # --------------------------------------------------------------------------- |
| 180 | |
| 181 | |
| 182 | async def _dynamics_async( |
| 183 | *, |
| 184 | root: pathlib.Path, |
| 185 | session: AsyncSession, |
| 186 | commit: Optional[str], |
| 187 | track: Optional[str], |
| 188 | section: Optional[str], |
| 189 | compare: Optional[str], |
| 190 | history: bool, |
| 191 | peak: bool, |
| 192 | range_flag: bool, |
| 193 | arc: bool, |
| 194 | as_json: bool, |
| 195 | ) -> None: |
| 196 | """Core dynamics analysis logic — fully injectable for tests. |
| 197 | |
| 198 | Stub implementation: reads branch/commit metadata from ``.muse/``, |
| 199 | applies flag-driven filters to placeholder velocity data, and emits |
| 200 | a formatted table or JSON payload. |
| 201 | |
| 202 | Args: |
| 203 | root: Repository root (directory containing ``.muse/``). |
| 204 | session: Open async DB session (reserved for full implementation). |
| 205 | commit: Commit ref to analyse; defaults to HEAD. |
| 206 | track: Case-insensitive prefix filter; only matching tracks shown. |
| 207 | section: Restrict analysis to a named region (future: pass to query). |
| 208 | compare: Second commit ref for side-by-side comparison (stub: noted). |
| 209 | history: If True, show dynamics for every commit in branch history. |
| 210 | peak: If True, show only tracks whose peak exceeds branch average. |
| 211 | range_flag: If True, sort by velocity range descending. |
| 212 | arc: If True, filter tracks to the arc matching the --track value. |
| 213 | as_json: Emit JSON instead of ASCII table. |
| 214 | """ |
| 215 | muse_dir = root / ".muse" |
| 216 | |
| 217 | # -- Resolve branch / commit ref -- |
| 218 | head_ref = (muse_dir / "HEAD").read_text().strip() # "refs/heads/main" |
| 219 | branch = head_ref.rsplit("/", 1)[-1] if "/" in head_ref else head_ref |
| 220 | ref_path = muse_dir / pathlib.Path(head_ref) |
| 221 | |
| 222 | head_commit_id = "" |
| 223 | if ref_path.exists(): |
| 224 | head_commit_id = ref_path.read_text().strip() |
| 225 | |
| 226 | commit_ref = commit or (head_commit_id[:8] if head_commit_id else "HEAD") |
| 227 | |
| 228 | if not head_commit_id and not commit: |
| 229 | typer.echo(f"No commits yet on branch {branch} — nothing to analyse.") |
| 230 | raise typer.Exit(code=ExitCode.SUCCESS) |
| 231 | |
| 232 | # -- Stub: produce placeholder profiles -- |
| 233 | profiles = _stub_profiles() |
| 234 | |
| 235 | # -- Apply --track filter -- |
| 236 | if track: |
| 237 | prefix = track.lower() |
| 238 | if arc: |
| 239 | # --arc mode: filter to tracks whose arc matches the --track value as arc label |
| 240 | if prefix not in _VALID_ARCS: |
| 241 | typer.echo( |
| 242 | f"⚠️ '{track}' is not a valid arc label. " |
| 243 | f"Valid arcs: {', '.join(sorted(_VALID_ARCS))}" |
| 244 | ) |
| 245 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 246 | profiles = [p for p in profiles if p.arc == prefix] |
| 247 | else: |
| 248 | profiles = [p for p in profiles if p.name.lower().startswith(prefix)] |
| 249 | |
| 250 | # -- Apply --peak filter (show only above-average peak tracks) -- |
| 251 | if peak and profiles: |
| 252 | avg_peak = sum(p.peak_velocity for p in profiles) / len(profiles) |
| 253 | profiles = [p for p in profiles if p.peak_velocity > avg_peak] |
| 254 | |
| 255 | # -- Apply --range sort -- |
| 256 | if range_flag: |
| 257 | profiles = sorted(profiles, key=lambda p: p.velocity_range, reverse=True) |
| 258 | |
| 259 | # -- --history mode: note the stub boundary -- |
| 260 | if history: |
| 261 | typer.echo( |
| 262 | f"⚠️ --history: full commit-chain dynamics not yet implemented. " |
| 263 | f"Showing HEAD ({commit_ref}) only." |
| 264 | ) |
| 265 | |
| 266 | # -- --compare note -- |
| 267 | if compare: |
| 268 | typer.echo( |
| 269 | f"⚠️ --compare {compare}: side-by-side comparison not yet implemented." |
| 270 | ) |
| 271 | |
| 272 | # -- --section note -- |
| 273 | if section: |
| 274 | typer.echo(f"⚠️ --section {section}: region filtering not yet implemented.") |
| 275 | |
| 276 | # -- Render -- |
| 277 | if not profiles: |
| 278 | typer.echo("No tracks match the specified filters.") |
| 279 | return |
| 280 | |
| 281 | if as_json: |
| 282 | _render_json(profiles, commit_ref=commit_ref, branch=branch) |
| 283 | else: |
| 284 | _render_table(profiles, commit_ref=commit_ref, branch=branch) |
| 285 | |
| 286 | |
| 287 | # --------------------------------------------------------------------------- |
| 288 | # Typer command |
| 289 | # --------------------------------------------------------------------------- |
| 290 | |
| 291 | |
| 292 | @app.callback(invoke_without_command=True) |
| 293 | def dynamics( |
| 294 | ctx: typer.Context, |
| 295 | commit: Optional[str] = typer.Argument( |
| 296 | None, |
| 297 | help="Commit ref to analyse (default: HEAD).", |
| 298 | metavar="COMMIT", |
| 299 | ), |
| 300 | track: Optional[str] = typer.Option( |
| 301 | None, |
| 302 | "--track", |
| 303 | help="Filter to a single track (case-insensitive prefix match).", |
| 304 | metavar="TEXT", |
| 305 | ), |
| 306 | section: Optional[str] = typer.Option( |
| 307 | None, |
| 308 | "--section", |
| 309 | help="Restrict analysis to a named section/region.", |
| 310 | metavar="TEXT", |
| 311 | ), |
| 312 | compare: Optional[str] = typer.Option( |
| 313 | None, |
| 314 | "--compare", |
| 315 | help="Compare dynamics against a second commit.", |
| 316 | metavar="COMMIT", |
| 317 | ), |
| 318 | history: bool = typer.Option( |
| 319 | False, |
| 320 | "--history", |
| 321 | help="Show dynamics for every commit in branch history.", |
| 322 | ), |
| 323 | peak: bool = typer.Option( |
| 324 | False, |
| 325 | "--peak", |
| 326 | help="Show only tracks whose peak velocity exceeds the branch average.", |
| 327 | ), |
| 328 | range_flag: bool = typer.Option( |
| 329 | False, |
| 330 | "--range", |
| 331 | help="Sort output by velocity range (descending).", |
| 332 | ), |
| 333 | arc: bool = typer.Option( |
| 334 | False, |
| 335 | "--arc", |
| 336 | help="When combined with --track, treat the value as an arc label filter.", |
| 337 | ), |
| 338 | as_json: bool = typer.Option( |
| 339 | False, |
| 340 | "--json", |
| 341 | help="Emit results as JSON instead of the ASCII table.", |
| 342 | ), |
| 343 | ) -> None: |
| 344 | """Analyse the dynamic (velocity) profile of a commit.""" |
| 345 | root = require_repo() |
| 346 | |
| 347 | async def _run() -> None: |
| 348 | async with open_session() as session: |
| 349 | await _dynamics_async( |
| 350 | root=root, |
| 351 | session=session, |
| 352 | commit=commit, |
| 353 | track=track, |
| 354 | section=section, |
| 355 | compare=compare, |
| 356 | history=history, |
| 357 | peak=peak, |
| 358 | range_flag=range_flag, |
| 359 | arc=arc, |
| 360 | as_json=as_json, |
| 361 | ) |
| 362 | |
| 363 | try: |
| 364 | asyncio.run(_run()) |
| 365 | except typer.Exit: |
| 366 | raise |
| 367 | except Exception as exc: |
| 368 | typer.echo(f"❌ muse dynamics failed: {exc}") |
| 369 | logger.error("❌ muse dynamics error: %s", exc, exc_info=True) |
| 370 | raise typer.Exit(code=ExitCode.INTERNAL_ERROR) |