cgcardona / muse public
lineage.py python
388 lines 13.0 KB
b4e8aaf2 feat(code): Phase 1 — lineage, api-surface, codemap, clones, checkout-s… Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """muse lineage — full symbol provenance chain.
2
3 Traces the complete life of a symbol through the commit history:
4 created → renamed → moved → copied → deleted, in chronological order.
5
6 Each transition is classified by comparing hashes across consecutive commits:
7
8 * **created** — first InsertOp for this address (no prior body_hash match)
9 * **copied_from** — InsertOp whose body_hash matches a living symbol at a
10 different address (same body, new address)
11 * **renamed_from** — InsertOp + DeleteOp in same commit with matching body_hash
12 (content preserved, address changed)
13 * **moved_from** — InsertOp + DeleteOp in same commit with matching body_hash
14 AND different file (cross-file move)
15 * **modified** — ReplaceOp at this address; sub-classified by which hashes
16 changed: impl_only, signature_only, full_rewrite
17 * **deleted** — DeleteOp at this address
18
19 Usage::
20
21 muse lineage "src/billing.py::compute_invoice_total"
22 muse lineage "src/auth.py::validate_token" --commit HEAD~5
23 muse lineage "src/core.py::hash_content" --json
24
25 Output::
26
27 Lineage: src/billing.py::compute_invoice_total
28 ──────────────────────────────────────────────────────────────
29
30 2026-02-01 a1b2c3d4 created
31 2026-02-10 e5f6a7b8 modified (impl_only)
32 2026-02-15 c9d0e1f2 renamed_from src/billing.py::_compute_total
33 2026-03-01 a3b4c5d6 moved_from old/billing.py::compute_invoice_total
34 2026-03-10 f7a8b9c0 modified (full_rewrite)
35
36 5 events — first seen 2026-02-01 · last seen 2026-03-10
37
38 Flags:
39
40 ``--commit, -c REF``
41 Walk history starting from this commit instead of HEAD.
42
43 ``--json``
44 Emit the full provenance chain as JSON.
45 """
46 from __future__ import annotations
47
48 import json
49 import logging
50 import pathlib
51 from typing import Literal
52
53 import typer
54
55 from muse.core.errors import ExitCode
56 from muse.core.object_store import read_object
57 from muse.core.repo import require_repo
58 from muse.core.store import (
59 get_all_commits,
60 get_commit_snapshot_manifest,
61 resolve_commit_ref,
62 )
63 from muse.plugins.code._query import flat_symbol_ops
64 from muse.plugins.code.ast_parser import parse_symbols
65
66
67 class _InsertFields:
68 """Extracted fields from an InsertOp — typed slots avoid untyped dict issues."""
69 __slots__ = ("address", "content_id")
70
71 def __init__(self, address: str, content_id: str) -> None:
72 self.address = address
73 self.content_id = content_id
74
75
76 class _DeleteFields:
77 __slots__ = ("address", "content_id")
78
79 def __init__(self, address: str, content_id: str) -> None:
80 self.address = address
81 self.content_id = content_id
82
83
84 class _ReplaceFields:
85 __slots__ = ("address", "old_content_id", "new_content_id", "old_summary", "new_summary")
86
87 def __init__(
88 self,
89 address: str,
90 old_content_id: str,
91 new_content_id: str,
92 old_summary: str,
93 new_summary: str,
94 ) -> None:
95 self.address = address
96 self.old_content_id = old_content_id
97 self.new_content_id = new_content_id
98 self.old_summary = old_summary
99 self.new_summary = new_summary
100
101 logger = logging.getLogger(__name__)
102
103 app = typer.Typer()
104
105 EventKind = Literal[
106 "created",
107 "renamed_from",
108 "moved_from",
109 "copied_from",
110 "modified",
111 "deleted",
112 ]
113
114 _FUNCTION_KINDS = frozenset({
115 "function", "async_function", "method", "async_method", "class",
116 })
117
118
119 def _read_repo_id(root: pathlib.Path) -> str:
120 return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"])
121
122
123 def _read_branch(root: pathlib.Path) -> str:
124 head_ref = (root / ".muse" / "HEAD").read_text().strip()
125 return head_ref.removeprefix("refs/heads/").strip()
126
127
128 def _body_hash_for(root: pathlib.Path, manifest: dict[str, str], address: str) -> str | None:
129 """Return body_hash for *address* in *manifest*, or None if not found."""
130 if "::" not in address:
131 return None
132 file_path = address.split("::")[0]
133 obj_id = manifest.get(file_path)
134 if obj_id is None:
135 return None
136 raw = read_object(root, obj_id)
137 if raw is None:
138 return None
139 tree = parse_symbols(raw, file_path)
140 rec = tree.get(address)
141 return rec["body_hash"] if rec else None
142
143
144 class _LineageEvent:
145 def __init__(
146 self,
147 commit_id: str,
148 committed_at: str,
149 kind: EventKind,
150 detail: str = "",
151 old_body_hash: str = "",
152 new_body_hash: str = "",
153 old_content_id: str = "",
154 new_content_id: str = "",
155 ) -> None:
156 self.commit_id = commit_id
157 self.committed_at = committed_at
158 self.kind = kind
159 self.detail = detail
160 self.old_body_hash = old_body_hash
161 self.new_body_hash = new_body_hash
162 self.old_content_id = old_content_id
163 self.new_content_id = new_content_id
164
165 def to_dict(self) -> dict[str, str]:
166 d: dict[str, str] = {
167 "commit_id": self.commit_id[:8],
168 "committed_at": self.committed_at,
169 "event": self.kind,
170 }
171 if self.detail:
172 d["detail"] = self.detail
173 if self.old_body_hash:
174 d["old_body_hash"] = self.old_body_hash[:8]
175 if self.new_body_hash:
176 d["new_body_hash"] = self.new_body_hash[:8]
177 if self.old_content_id:
178 d["old_content_id"] = self.old_content_id[:8]
179 if self.new_content_id:
180 d["new_content_id"] = self.new_content_id[:8]
181 return d
182
183
184 def _classify_replace(old_content_id: str, new_content_id: str,
185 old_summary: str, new_summary: str) -> str:
186 """Classify a ReplaceOp by examining summary strings for hash markers."""
187 if "signature" in old_summary or "signature" in new_summary:
188 return "signature_change"
189 if old_content_id[:8] != new_content_id[:8]:
190 return "full_rewrite"
191 return "impl_only"
192
193
194 def build_lineage(
195 root: pathlib.Path,
196 address: str,
197 ) -> list[_LineageEvent]:
198 """Walk all commits oldest-first and build the provenance chain."""
199 all_commits = sorted(
200 get_all_commits(root),
201 key=lambda c: c.committed_at,
202 )
203
204 events: list[_LineageEvent] = []
205 # Track whether we currently "own" this address (it exists in HEAD at each step).
206 address_live = False
207 current_body_hash: str | None = None
208
209 for commit in all_commits:
210 if commit.structured_delta is None:
211 continue
212 ops = commit.structured_delta.get("ops", [])
213 committed_at = commit.committed_at.isoformat()
214 short = commit.commit_id[:8]
215
216 # Gather all symbol-level ops for this commit using discriminated access.
217 inserts: dict[str, _InsertFields] = {}
218 deletes: dict[str, _DeleteFields] = {}
219 replaces: dict[str, _ReplaceFields] = {}
220
221 for op in flat_symbol_ops(ops):
222 addr = op["address"]
223 if op["op"] == "insert":
224 inserts[addr] = _InsertFields(
225 address=addr,
226 content_id=op["content_id"],
227 )
228 elif op["op"] == "delete":
229 deletes[addr] = _DeleteFields(
230 address=addr,
231 content_id=op["content_id"],
232 )
233 elif op["op"] == "replace":
234 replaces[addr] = _ReplaceFields(
235 address=addr,
236 old_content_id=op["old_content_id"],
237 new_content_id=op["new_content_id"],
238 old_summary=op["old_summary"],
239 new_summary=op["new_summary"],
240 )
241
242 if address in replaces:
243 rep = replaces[address]
244 old_cid = rep.old_content_id
245 new_cid = rep.new_content_id
246 old_sum = rep.old_summary
247 new_sum = rep.new_summary
248 detail = _classify_replace(old_cid, new_cid, old_sum, new_sum)
249 events.append(_LineageEvent(
250 commit_id=commit.commit_id,
251 committed_at=committed_at,
252 kind="modified",
253 detail=detail,
254 old_content_id=old_cid,
255 new_content_id=new_cid,
256 ))
257
258 if address in inserts:
259 ins = inserts[address]
260 ins_cid = ins.content_id
261
262 # Look for a DeleteOp with the same content_id in this commit →
263 # rename or move detection.
264 source_addr: str | None = None
265 for del_addr, del_op in deletes.items():
266 if del_addr == address:
267 continue
268 if del_op.content_id and del_op.content_id == ins_cid:
269 source_addr = del_addr
270 break
271
272 if source_addr is not None:
273 del_file = source_addr.split("::")[0]
274 ins_file = address.split("::")[0]
275 kind: EventKind = "moved_from" if del_file != ins_file else "renamed_from"
276 events.append(_LineageEvent(
277 commit_id=commit.commit_id,
278 committed_at=committed_at,
279 kind=kind,
280 detail=source_addr,
281 new_content_id=ins_cid,
282 ))
283 else:
284 # Check if another live symbol shares the body_hash → copy.
285 manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {}
286 ins_body = _body_hash_for(root, manifest, address)
287 copy_source: str | None = None
288 if ins_body and not address_live:
289 # Scan the snapshot for another address with the same body_hash.
290 for file_path, obj_id in sorted(manifest.items()):
291 raw = read_object(root, obj_id)
292 if raw is None:
293 continue
294 tree = parse_symbols(raw, file_path)
295 for other_addr, rec in tree.items():
296 if other_addr != address and rec["body_hash"] == ins_body:
297 copy_source = other_addr
298 break
299 if copy_source:
300 break
301
302 if copy_source:
303 ev_kind: EventKind = "copied_from"
304 else:
305 ev_kind = "created"
306 events.append(_LineageEvent(
307 commit_id=commit.commit_id,
308 committed_at=committed_at,
309 kind=ev_kind,
310 detail=copy_source or "",
311 new_content_id=ins_cid,
312 ))
313 address_live = True
314 current_body_hash = ins_cid
315
316 if address in deletes:
317 del_f = deletes[address]
318 events.append(_LineageEvent(
319 commit_id=commit.commit_id,
320 committed_at=committed_at,
321 kind="deleted",
322 old_content_id=del_f.content_id,
323 ))
324 address_live = False
325 current_body_hash = None
326
327 return events
328
329
330 @app.callback(invoke_without_command=True)
331 def lineage(
332 ctx: typer.Context,
333 address: str = typer.Argument(
334 ..., metavar="ADDRESS",
335 help='Symbol address, e.g. "src/billing.py::compute_invoice_total".',
336 ),
337 ref: str | None = typer.Option(
338 None, "--commit", "-c", metavar="REF",
339 help="Walk history from this commit instead of HEAD.",
340 ),
341 as_json: bool = typer.Option(False, "--json", help="Emit results as JSON."),
342 ) -> None:
343 """Show the full provenance chain of a symbol through commit history.
344
345 Classifies every event as: created, renamed_from, moved_from, copied_from,
346 modified (impl_only / signature_change / full_rewrite), or deleted.
347
348 Rename and move detection works by matching body hashes across DeleteOp and
349 InsertOp pairs within the same commit. Copy detection looks for another
350 living symbol with the same body hash at the time of insertion.
351 """
352 root = require_repo()
353 events = build_lineage(root, address)
354
355 if as_json:
356 typer.echo(json.dumps(
357 {
358 "address": address,
359 "events": [e.to_dict() for e in events],
360 "total": len(events),
361 },
362 indent=2,
363 ))
364 return
365
366 typer.echo(f"\nLineage: {address}")
367 typer.echo("─" * 62)
368
369 if not events:
370 typer.echo(
371 "\n (no events found — address may not exist in this repository's history)"
372 )
373 return
374
375 for ev in events:
376 date = ev.committed_at[:10]
377 short = ev.commit_id[:8]
378 label: str = ev.kind
379 if ev.detail:
380 label = f"{ev.kind} {ev.detail}"
381 typer.echo(f" {date} {short} {label}")
382
383 typer.echo()
384 first = events[0].committed_at[:10]
385 last = events[-1].committed_at[:10]
386 typer.echo(
387 f" {len(events)} event(s) — first seen {first} · last seen {last}"
388 )