cgcardona / muse public
blame.py python
378 lines 12.9 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """muse blame <path> — annotate a file with the commit that last changed it.
2
3 For each file path in the current HEAD snapshot (filtered by the positional
4 ``<path>`` argument and optional ``--track``/``--section`` flags), walks the
5 commit graph to find the most recent commit that touched that file.
6
7 In music production, blame answers:
8
9 - "Whose idea was this bass line?"
10 - "Which take introduced this change?"
11 - "Which commit first added the bridge strings?"
12
13 Output is per-file (not per-line) because MIDI/audio files are binary — the
14 meaningful unit of change is a whole file, not a byte offset.
15
16 **Algorithm:**
17
18 1. Load all commits from HEAD, following ``parent_commit_id`` links.
19 2. For each adjacent pair ``(C_i, C_{i-1})`` (newest to oldest), load their
20 snapshot manifests and compare ``object_id`` values per path.
21 3. The first pair where a path differs (object_id changed, added, or removed)
22 identifies the most recent commit to have touched that path.
23 4. Paths present in the initial commit (no parent) are attributed to it.
24
25 This is O(N × F) in commits × files, which is acceptable for DAW session
26 history (typically <1 000 commits, <100 files per snapshot).
27
28 Flags
29 -----
30 PATH TEXT Positional — relative path within muse-work/ to annotate.
31 Omit to blame all tracked files.
32 --track TEXT Filter to paths whose last component matches this pattern
33 (fnmatch-style glob, e.g. ``bass*`` or ``*.mid``).
34 --section TEXT Filter to paths whose first directory component equals this
35 section name (e.g. ``chorus`` or ``bridge``).
36 --line-range N,M Note: MIDI/audio are binary; line-range is recorded in the
37 output for annotation purposes but does not slice the file.
38 --json Emit structured JSON for agent consumption.
39 """
40 from __future__ import annotations
41
42 import asyncio
43 import fnmatch
44 import json
45 import logging
46 import pathlib
47 from typing import Optional
48
49 import typer
50 from sqlalchemy.ext.asyncio import AsyncSession
51 from typing_extensions import TypedDict
52
53 from maestro.muse_cli._repo import require_repo
54 from maestro.muse_cli.db import open_session
55 from maestro.muse_cli.errors import ExitCode
56 from maestro.muse_cli.models import MuseCliCommit, MuseCliSnapshot
57
58 logger = logging.getLogger(__name__)
59
60 app = typer.Typer()
61
62
63 # ---------------------------------------------------------------------------
64 # Result types
65 # ---------------------------------------------------------------------------
66
67
68 class BlameEntry(TypedDict):
69 """Blame annotation for a single file path.
70
71 ``change_type`` describes how the path changed in ``commit_id``:
72
73 - ``"added"`` — first commit to include this path
74 - ``"modified"`` — object_id changed compared to the parent snapshot
75 - ``"unchanged"`` — fallback when the graph walk finds no modification
76 (should not occur in a consistent database)
77 """
78
79 path: str
80 commit_id: str
81 commit_short: str
82 author: str
83 committed_at: str
84 message: str
85 change_type: str
86
87
88 class BlameResult(TypedDict):
89 """Full output of ``muse blame``.
90
91 ``entries`` is ordered by path (ascending alphabetical).
92 """
93
94 path_filter: Optional[str]
95 track_filter: Optional[str]
96 section_filter: Optional[str]
97 line_range: Optional[str]
98 entries: list[BlameEntry]
99
100
101 # ---------------------------------------------------------------------------
102 # Core helpers
103 # ---------------------------------------------------------------------------
104
105
106 async def _load_commit_chain(
107 session: AsyncSession,
108 head_commit_id: str,
109 limit: int = 10_000,
110 ) -> list[MuseCliCommit]:
111 """Walk the parent chain from *head_commit_id*, returning newest-first.
112
113 Stops when the chain is exhausted or *limit* is reached.
114 """
115 commits: list[MuseCliCommit] = []
116 current_id: str | None = head_commit_id
117 while current_id and len(commits) < limit:
118 commit = await session.get(MuseCliCommit, current_id)
119 if commit is None:
120 logger.warning("⚠️ Commit %s not found — chain broken", current_id[:8])
121 break
122 commits.append(commit)
123 current_id = commit.parent_commit_id
124 return commits
125
126
127 async def _load_snapshot_manifest(
128 session: AsyncSession,
129 snapshot_id: str,
130 ) -> dict[str, str]:
131 """Return the manifest dict for *snapshot_id*, or an empty dict on miss."""
132 snapshot = await session.get(MuseCliSnapshot, snapshot_id)
133 if snapshot is None:
134 logger.warning("⚠️ Snapshot %s not found in DB", snapshot_id[:8])
135 return {}
136 return dict(snapshot.manifest)
137
138
139 def _matches_filters(
140 path: str,
141 path_filter: str | None,
142 track_filter: str | None,
143 section_filter: str | None,
144 ) -> bool:
145 """Return True when *path* passes all active filter criteria.
146
147 Filters are AND-combined: all supplied filters must match.
148
149 - *path_filter*: substring / exact match on the full path string.
150 - *track_filter*: fnmatch pattern applied to the basename.
151 - *section_filter*: exact match on the first directory component.
152 """
153 if path_filter is not None:
154 # Accept both exact match and sub-path match (e.g. "bass" matches
155 # "muse-work/bass/bassline.mid" as a substring)
156 if path_filter not in path and not path.endswith(path_filter):
157 return False
158
159 if track_filter is not None:
160 basename = pathlib.PurePosixPath(path).name
161 if not fnmatch.fnmatch(basename, track_filter):
162 return False
163
164 if section_filter is not None:
165 parts = pathlib.PurePosixPath(path).parts
166 # parts might be ("muse-work", "chorus", "piano.mid") or ("chorus", "piano.mid")
167 # We match the first non-"muse-work" directory component
168 dirs = [p for p in parts[:-1] if p != "muse-work"]
169 if not dirs or dirs[0] != section_filter:
170 return False
171
172 return True
173
174
175 async def _blame_async(
176 *,
177 root: pathlib.Path,
178 session: AsyncSession,
179 path_filter: str | None,
180 track_filter: str | None,
181 section_filter: str | None,
182 line_range: str | None,
183 ) -> BlameResult:
184 """Compute blame annotations for all matching paths.
185
186 Walks the commit graph from HEAD, comparing snapshot manifests between
187 adjacent commits to attribute each file to the most recent commit that
188 touched it. Returns a :class:`BlameResult` suitable for both human-
189 readable rendering and JSON serialisation.
190 """
191 muse_dir = root / ".muse"
192 repo_data: dict[str, str] = json.loads((muse_dir / "repo.json").read_text())
193 repo_id = repo_data["repo_id"] # noqa: F841
194
195 head_ref = (muse_dir / "HEAD").read_text().strip()
196 branch = head_ref.rsplit("/", 1)[-1]
197 ref_path = muse_dir / pathlib.Path(head_ref)
198
199 if not ref_path.exists():
200 typer.echo(f"No commits yet on branch {branch}")
201 raise typer.Exit(code=ExitCode.SUCCESS)
202
203 head_commit_id = ref_path.read_text().strip()
204 if not head_commit_id:
205 typer.echo(f"No commits yet on branch {branch}")
206 raise typer.Exit(code=ExitCode.SUCCESS)
207
208 # Load all commits newest-first
209 commits = await _load_commit_chain(session, head_commit_id)
210 if not commits:
211 typer.echo(f"No commits yet on branch {branch}")
212 raise typer.Exit(code=ExitCode.SUCCESS)
213
214 # Load all manifests up-front (one DB query per snapshot)
215 manifests: list[dict[str, str]] = []
216 for commit in commits:
217 manifest = await _load_snapshot_manifest(session, commit.snapshot_id)
218 manifests.append(manifest)
219
220 # HEAD snapshot defines which paths exist right now
221 head_manifest = manifests[0]
222
223 # blame_map: path → commit (newest commit that changed this path)
224 blame_map: dict[str, tuple[MuseCliCommit, str]] = {} # path → (commit, change_type)
225
226 # Walk pairs newest→oldest: (commits[i], commits[i+1])
227 for i in range(len(commits) - 1):
228 newer_commit = commits[i]
229 newer_manifest = manifests[i]
230 older_manifest = manifests[i + 1]
231
232 for path in newer_manifest:
233 if path in blame_map:
234 continue # already attributed to a more recent commit
235 newer_oid = newer_manifest[path]
236 older_oid = older_manifest.get(path)
237 if older_oid is None:
238 # Path was added by newer_commit
239 blame_map[path] = (newer_commit, "added")
240 elif newer_oid != older_oid:
241 # Path was modified by newer_commit
242 blame_map[path] = (newer_commit, "modified")
243
244 # Any path still unattributed was present in the initial commit (C_0)
245 # and never changed after — attribute it to the oldest commit
246 oldest_commit = commits[-1]
247 for path in head_manifest:
248 if path not in blame_map:
249 blame_map[path] = (oldest_commit, "added")
250
251 # Build entries, applying filters
252 entries: list[BlameEntry] = []
253 for path in sorted(head_manifest.keys()):
254 if not _matches_filters(path, path_filter, track_filter, section_filter):
255 continue
256 commit, change_type = blame_map.get(path, (oldest_commit, "unchanged"))
257 entries.append(
258 BlameEntry(
259 path=path,
260 commit_id=commit.commit_id,
261 commit_short=commit.commit_id[:8],
262 author=commit.author or "(unknown)",
263 committed_at=commit.committed_at.strftime("%Y-%m-%d %H:%M:%S"),
264 message=commit.message,
265 change_type=change_type,
266 )
267 )
268
269 if not entries:
270 typer.echo("No matching paths found.")
271 raise typer.Exit(code=ExitCode.SUCCESS)
272
273 return BlameResult(
274 path_filter=path_filter,
275 track_filter=track_filter,
276 section_filter=section_filter,
277 line_range=line_range,
278 entries=entries,
279 )
280
281
282 # ---------------------------------------------------------------------------
283 # Renderers
284 # ---------------------------------------------------------------------------
285
286
287 def _render_blame(result: BlameResult) -> str:
288 """Format blame output as a human-readable annotated file list.
289
290 Each line shows the short commit ID, author, date, change type, and
291 the file path — analogous to ``git blame`` but per-file rather than
292 per-line, since MIDI and audio files are binary.
293 """
294 lines: list[str] = []
295 if result["line_range"]:
296 lines.append(f"(line-range: {result['line_range']} — informational only for binary files)")
297 lines.append("")
298 for entry in result["entries"]:
299 lines.append(
300 f"{entry['commit_short']} {entry['author']:<20} "
301 f"{entry['committed_at']} ({entry['change_type']:>10}) {entry['path']}"
302 )
303 lines.append(f" {entry['message']}")
304 return "\n".join(lines)
305
306
307 # ---------------------------------------------------------------------------
308 # Typer command
309 # ---------------------------------------------------------------------------
310
311
312 @app.callback(invoke_without_command=True)
313 def blame(
314 ctx: typer.Context,
315 path: Optional[str] = typer.Argument(
316 None,
317 help="Relative path within muse-work/ to annotate. Omit to blame all tracked files.",
318 metavar="PATH",
319 ),
320 track: Optional[str] = typer.Option(
321 None,
322 "--track",
323 help="Filter to files whose basename matches this fnmatch pattern (e.g. 'bass*' or '*.mid').",
324 ),
325 section: Optional[str] = typer.Option(
326 None,
327 "--section",
328 help="Filter to files within this section directory (first directory component).",
329 ),
330 line_range: Optional[str] = typer.Option(
331 None,
332 "--line-range",
333 help="Annotate sub-range N,M (informational for binary MIDI/audio files).",
334 metavar="N,M",
335 ),
336 as_json: bool = typer.Option(
337 False,
338 "--json",
339 help="Emit structured JSON for agent consumption.",
340 ),
341 ) -> None:
342 """Annotate files with the commit that last changed each one.
343
344 Walks the commit graph from HEAD to find the most recent commit that
345 touched each file, answering "whose idea was this bass line?" or
346 "which take introduced this change?"
347
348 Output is per-file (not per-line) because MIDI and audio files are
349 binary — the meaningful unit of change is a whole file.
350 """
351 if ctx.invoked_subcommand is not None:
352 return
353
354 root = require_repo()
355
356 async def _run() -> None:
357 async with open_session() as session:
358 result = await _blame_async(
359 root=root,
360 session=session,
361 path_filter=path,
362 track_filter=track,
363 section_filter=section,
364 line_range=line_range,
365 )
366 if as_json:
367 typer.echo(json.dumps(dict(result), indent=2))
368 else:
369 typer.echo(_render_blame(result))
370
371 try:
372 asyncio.run(_run())
373 except typer.Exit:
374 raise
375 except Exception as exc:
376 typer.echo(f"❌ muse blame failed: {exc}")
377 logger.error("❌ muse blame error: %s", exc, exc_info=True)
378 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)