cgcardona / muse public
dynamics.py python
370 lines 11.5 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """muse dynamics — analyze the dynamic profile of a commit.
2
3 Examines velocity data across tracks for a given commit (defaults to HEAD)
4 and reports per-track statistics with an arc classification.
5
6 Output (default tabular)::
7
8 Dynamic profile — commit a1b2c3d4 (HEAD -> main)
9
10 Track Avg Vel Peak Range Arc
11 --------- ------- ---- ----- -----------
12 drums 88 110 42 terraced
13 bass 72 85 28 flat
14 keys 64 95 56 crescendo
15 lead 79 105 38 swell
16
17 Arc vocabulary
18 --------------
19 - flat — velocity variance < 10; steady throughout
20 - crescendo — monotonically rising from start to end
21 - decrescendo — monotonically falling from start to end
22 - terraced — step-wise plateaus; sudden jumps between stable levels
23 - swell — rises then falls (arch shape)
24
25 Flags
26 -----
27 --track TEXT Filter to a single track (case-insensitive prefix match).
28 --section TEXT Restrict analysis to a named section/region.
29 --compare COMMIT Compare dynamics of <commit> against <COMMIT>.
30 --history Print dynamics for every commit in the branch history.
31 --peak Show only tracks whose peak velocity exceeds the branch average.
32 --range Sort output by velocity range (descending).
33 --arc Filter to only tracks matching the arc label given via --track.
34 --json Emit results as JSON instead of the ASCII table.
35 """
36 from __future__ import annotations
37
38 import asyncio
39 import json
40 import logging
41 import pathlib
42 from typing import Optional
43
44 import typer
45 from sqlalchemy.ext.asyncio import AsyncSession
46
47 from maestro.muse_cli._repo import require_repo
48 from maestro.muse_cli.db import open_session
49 from maestro.muse_cli.errors import ExitCode
50
51 logger = logging.getLogger(__name__)
52
53 app = typer.Typer()
54
55 # ---------------------------------------------------------------------------
56 # Types
57 # ---------------------------------------------------------------------------
58
59 ArcLabel = str # one of: flat | crescendo | decrescendo | terraced | swell
60
61 _ARC_LABELS: tuple[str, ...] = ("flat", "crescendo", "decrescendo", "terraced", "swell")
62
63 _VALID_ARCS: frozenset[str] = frozenset(_ARC_LABELS)
64
65
66 class TrackDynamics:
67 """Dynamic profile for a single track."""
68
69 __slots__ = ("name", "avg_velocity", "peak_velocity", "velocity_range", "arc")
70
71 def __init__(
72 self,
73 name: str,
74 avg_velocity: int,
75 peak_velocity: int,
76 velocity_range: int,
77 arc: ArcLabel,
78 ) -> None:
79 self.name = name
80 self.avg_velocity = avg_velocity
81 self.peak_velocity = peak_velocity
82 self.velocity_range = velocity_range
83 self.arc = arc
84
85 def to_dict(self) -> dict[str, object]:
86 return {
87 "track": self.name,
88 "avg_velocity": self.avg_velocity,
89 "peak_velocity": self.peak_velocity,
90 "velocity_range": self.velocity_range,
91 "arc": self.arc,
92 }
93
94
95 # ---------------------------------------------------------------------------
96 # Stub data — realistic placeholder until MIDI note data is queryable
97 # ---------------------------------------------------------------------------
98
99 _STUB_TRACKS: list[tuple[str, int, int, int, ArcLabel]] = [
100 ("drums", 88, 110, 42, "terraced"),
101 ("bass", 72, 85, 28, "flat"),
102 ("keys", 64, 95, 56, "crescendo"),
103 ("lead", 79, 105, 38, "swell"),
104 ]
105
106
107 def _stub_profiles() -> list[TrackDynamics]:
108 """Return stub TrackDynamics rows (placeholder for real DB query)."""
109 return [
110 TrackDynamics(
111 name=name,
112 avg_velocity=avg,
113 peak_velocity=peak,
114 velocity_range=rng,
115 arc=arc,
116 )
117 for name, avg, peak, rng, arc in _STUB_TRACKS
118 ]
119
120
121 # ---------------------------------------------------------------------------
122 # Rendering
123 # ---------------------------------------------------------------------------
124
125 _COL_WIDTHS = (9, 7, 4, 5, 11) # track, avg, peak, range, arc
126
127
128 def _render_table(rows: list[TrackDynamics], commit_ref: str, branch: str) -> None:
129 """Print a human-readable ASCII table of dynamics."""
130 head_label = f" (HEAD -> {branch})" if branch else ""
131 typer.echo(f"Dynamic profile — commit {commit_ref}{head_label}")
132 typer.echo("")
133
134 # Header
135 header = (
136 f"{'Track':<{_COL_WIDTHS[0]}} "
137 f"{'Avg Vel':>{_COL_WIDTHS[1]}} "
138 f"{'Peak':>{_COL_WIDTHS[2]}} "
139 f"{'Range':>{_COL_WIDTHS[3]}} "
140 f"{'Arc':<{_COL_WIDTHS[4]}}"
141 )
142 sep = (
143 f"{'-' * _COL_WIDTHS[0]} "
144 f"{'-' * _COL_WIDTHS[1]} "
145 f"{'-' * _COL_WIDTHS[2]} "
146 f"{'-' * _COL_WIDTHS[3]} "
147 f"{'-' * _COL_WIDTHS[4]}"
148 )
149 typer.echo(header)
150 typer.echo(sep)
151
152 for row in rows:
153 typer.echo(
154 f"{row.name:<{_COL_WIDTHS[0]}} "
155 f"{row.avg_velocity:>{_COL_WIDTHS[1]}} "
156 f"{row.peak_velocity:>{_COL_WIDTHS[2]}} "
157 f"{row.velocity_range:>{_COL_WIDTHS[3]}} "
158 f"{row.arc:<{_COL_WIDTHS[4]}}"
159 )
160 typer.echo("")
161
162
163 def _render_json(
164 rows: list[TrackDynamics],
165 commit_ref: str,
166 branch: str,
167 ) -> None:
168 """Emit dynamics as a JSON object."""
169 payload = {
170 "commit": commit_ref,
171 "branch": branch,
172 "tracks": [r.to_dict() for r in rows],
173 }
174 typer.echo(json.dumps(payload, indent=2))
175
176
177 # ---------------------------------------------------------------------------
178 # Testable async core
179 # ---------------------------------------------------------------------------
180
181
182 async def _dynamics_async(
183 *,
184 root: pathlib.Path,
185 session: AsyncSession,
186 commit: Optional[str],
187 track: Optional[str],
188 section: Optional[str],
189 compare: Optional[str],
190 history: bool,
191 peak: bool,
192 range_flag: bool,
193 arc: bool,
194 as_json: bool,
195 ) -> None:
196 """Core dynamics analysis logic — fully injectable for tests.
197
198 Stub implementation: reads branch/commit metadata from ``.muse/``,
199 applies flag-driven filters to placeholder velocity data, and emits
200 a formatted table or JSON payload.
201
202 Args:
203 root: Repository root (directory containing ``.muse/``).
204 session: Open async DB session (reserved for full implementation).
205 commit: Commit ref to analyse; defaults to HEAD.
206 track: Case-insensitive prefix filter; only matching tracks shown.
207 section: Restrict analysis to a named region (future: pass to query).
208 compare: Second commit ref for side-by-side comparison (stub: noted).
209 history: If True, show dynamics for every commit in branch history.
210 peak: If True, show only tracks whose peak exceeds branch average.
211 range_flag: If True, sort by velocity range descending.
212 arc: If True, filter tracks to the arc matching the --track value.
213 as_json: Emit JSON instead of ASCII table.
214 """
215 muse_dir = root / ".muse"
216
217 # -- Resolve branch / commit ref --
218 head_ref = (muse_dir / "HEAD").read_text().strip() # "refs/heads/main"
219 branch = head_ref.rsplit("/", 1)[-1] if "/" in head_ref else head_ref
220 ref_path = muse_dir / pathlib.Path(head_ref)
221
222 head_commit_id = ""
223 if ref_path.exists():
224 head_commit_id = ref_path.read_text().strip()
225
226 commit_ref = commit or (head_commit_id[:8] if head_commit_id else "HEAD")
227
228 if not head_commit_id and not commit:
229 typer.echo(f"No commits yet on branch {branch} — nothing to analyse.")
230 raise typer.Exit(code=ExitCode.SUCCESS)
231
232 # -- Stub: produce placeholder profiles --
233 profiles = _stub_profiles()
234
235 # -- Apply --track filter --
236 if track:
237 prefix = track.lower()
238 if arc:
239 # --arc mode: filter to tracks whose arc matches the --track value as arc label
240 if prefix not in _VALID_ARCS:
241 typer.echo(
242 f"⚠️ '{track}' is not a valid arc label. "
243 f"Valid arcs: {', '.join(sorted(_VALID_ARCS))}"
244 )
245 raise typer.Exit(code=ExitCode.USER_ERROR)
246 profiles = [p for p in profiles if p.arc == prefix]
247 else:
248 profiles = [p for p in profiles if p.name.lower().startswith(prefix)]
249
250 # -- Apply --peak filter (show only above-average peak tracks) --
251 if peak and profiles:
252 avg_peak = sum(p.peak_velocity for p in profiles) / len(profiles)
253 profiles = [p for p in profiles if p.peak_velocity > avg_peak]
254
255 # -- Apply --range sort --
256 if range_flag:
257 profiles = sorted(profiles, key=lambda p: p.velocity_range, reverse=True)
258
259 # -- --history mode: note the stub boundary --
260 if history:
261 typer.echo(
262 f"⚠️ --history: full commit-chain dynamics not yet implemented. "
263 f"Showing HEAD ({commit_ref}) only."
264 )
265
266 # -- --compare note --
267 if compare:
268 typer.echo(
269 f"⚠️ --compare {compare}: side-by-side comparison not yet implemented."
270 )
271
272 # -- --section note --
273 if section:
274 typer.echo(f"⚠️ --section {section}: region filtering not yet implemented.")
275
276 # -- Render --
277 if not profiles:
278 typer.echo("No tracks match the specified filters.")
279 return
280
281 if as_json:
282 _render_json(profiles, commit_ref=commit_ref, branch=branch)
283 else:
284 _render_table(profiles, commit_ref=commit_ref, branch=branch)
285
286
287 # ---------------------------------------------------------------------------
288 # Typer command
289 # ---------------------------------------------------------------------------
290
291
292 @app.callback(invoke_without_command=True)
293 def dynamics(
294 ctx: typer.Context,
295 commit: Optional[str] = typer.Argument(
296 None,
297 help="Commit ref to analyse (default: HEAD).",
298 metavar="COMMIT",
299 ),
300 track: Optional[str] = typer.Option(
301 None,
302 "--track",
303 help="Filter to a single track (case-insensitive prefix match).",
304 metavar="TEXT",
305 ),
306 section: Optional[str] = typer.Option(
307 None,
308 "--section",
309 help="Restrict analysis to a named section/region.",
310 metavar="TEXT",
311 ),
312 compare: Optional[str] = typer.Option(
313 None,
314 "--compare",
315 help="Compare dynamics against a second commit.",
316 metavar="COMMIT",
317 ),
318 history: bool = typer.Option(
319 False,
320 "--history",
321 help="Show dynamics for every commit in branch history.",
322 ),
323 peak: bool = typer.Option(
324 False,
325 "--peak",
326 help="Show only tracks whose peak velocity exceeds the branch average.",
327 ),
328 range_flag: bool = typer.Option(
329 False,
330 "--range",
331 help="Sort output by velocity range (descending).",
332 ),
333 arc: bool = typer.Option(
334 False,
335 "--arc",
336 help="When combined with --track, treat the value as an arc label filter.",
337 ),
338 as_json: bool = typer.Option(
339 False,
340 "--json",
341 help="Emit results as JSON instead of the ASCII table.",
342 ),
343 ) -> None:
344 """Analyse the dynamic (velocity) profile of a commit."""
345 root = require_repo()
346
347 async def _run() -> None:
348 async with open_session() as session:
349 await _dynamics_async(
350 root=root,
351 session=session,
352 commit=commit,
353 track=track,
354 section=section,
355 compare=compare,
356 history=history,
357 peak=peak,
358 range_flag=range_flag,
359 arc=arc,
360 as_json=as_json,
361 )
362
363 try:
364 asyncio.run(_run())
365 except typer.Exit:
366 raise
367 except Exception as exc:
368 typer.echo(f"❌ muse dynamics failed: {exc}")
369 logger.error("❌ muse dynamics error: %s", exc, exc_info=True)
370 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)