lineage.py
python
| 1 | """muse lineage — full symbol provenance chain. |
| 2 | |
| 3 | Traces the complete life of a symbol through the commit history: |
| 4 | created → renamed → moved → copied → deleted, in chronological order. |
| 5 | |
| 6 | Each transition is classified by comparing hashes across consecutive commits: |
| 7 | |
| 8 | * **created** — first InsertOp for this address (no prior body_hash match) |
| 9 | * **copied_from** — InsertOp whose body_hash matches a living symbol at a |
| 10 | different address (same body, new address) |
| 11 | * **renamed_from** — InsertOp + DeleteOp in same commit with matching body_hash |
| 12 | (content preserved, address changed) |
| 13 | * **moved_from** — InsertOp + DeleteOp in same commit with matching body_hash |
| 14 | AND different file (cross-file move) |
| 15 | * **modified** — ReplaceOp at this address; sub-classified by which hashes |
| 16 | changed: impl_only, signature_only, full_rewrite |
| 17 | * **deleted** — DeleteOp at this address |
| 18 | |
| 19 | Usage:: |
| 20 | |
| 21 | muse lineage "src/billing.py::compute_invoice_total" |
| 22 | muse lineage "src/auth.py::validate_token" --commit HEAD~5 |
| 23 | muse lineage "src/core.py::hash_content" --json |
| 24 | |
| 25 | Output:: |
| 26 | |
| 27 | Lineage: src/billing.py::compute_invoice_total |
| 28 | ────────────────────────────────────────────────────────────── |
| 29 | |
| 30 | 2026-02-01 a1b2c3d4 created |
| 31 | 2026-02-10 e5f6a7b8 modified (impl_only) |
| 32 | 2026-02-15 c9d0e1f2 renamed_from src/billing.py::_compute_total |
| 33 | 2026-03-01 a3b4c5d6 moved_from old/billing.py::compute_invoice_total |
| 34 | 2026-03-10 f7a8b9c0 modified (full_rewrite) |
| 35 | |
| 36 | 5 events — first seen 2026-02-01 · last seen 2026-03-10 |
| 37 | |
| 38 | Flags: |
| 39 | |
| 40 | ``--commit, -c REF`` |
| 41 | Walk history starting from this commit instead of HEAD. |
| 42 | |
| 43 | ``--json`` |
| 44 | Emit the full provenance chain as JSON. |
| 45 | """ |
| 46 | from __future__ import annotations |
| 47 | |
| 48 | import json |
| 49 | import logging |
| 50 | import pathlib |
| 51 | from typing import Literal |
| 52 | |
| 53 | import typer |
| 54 | |
| 55 | from muse.core.errors import ExitCode |
| 56 | from muse.core.object_store import read_object |
| 57 | from muse.core.repo import require_repo |
| 58 | from muse.core.store import ( |
| 59 | get_all_commits, |
| 60 | get_commit_snapshot_manifest, |
| 61 | resolve_commit_ref, |
| 62 | ) |
| 63 | from muse.plugins.code._query import flat_symbol_ops |
| 64 | from muse.plugins.code.ast_parser import parse_symbols |
| 65 | |
| 66 | |
| 67 | class _InsertFields: |
| 68 | """Extracted fields from an InsertOp — typed slots avoid untyped dict issues.""" |
| 69 | __slots__ = ("address", "content_id") |
| 70 | |
| 71 | def __init__(self, address: str, content_id: str) -> None: |
| 72 | self.address = address |
| 73 | self.content_id = content_id |
| 74 | |
| 75 | |
| 76 | class _DeleteFields: |
| 77 | __slots__ = ("address", "content_id") |
| 78 | |
| 79 | def __init__(self, address: str, content_id: str) -> None: |
| 80 | self.address = address |
| 81 | self.content_id = content_id |
| 82 | |
| 83 | |
| 84 | class _ReplaceFields: |
| 85 | __slots__ = ("address", "old_content_id", "new_content_id", "old_summary", "new_summary") |
| 86 | |
| 87 | def __init__( |
| 88 | self, |
| 89 | address: str, |
| 90 | old_content_id: str, |
| 91 | new_content_id: str, |
| 92 | old_summary: str, |
| 93 | new_summary: str, |
| 94 | ) -> None: |
| 95 | self.address = address |
| 96 | self.old_content_id = old_content_id |
| 97 | self.new_content_id = new_content_id |
| 98 | self.old_summary = old_summary |
| 99 | self.new_summary = new_summary |
| 100 | |
| 101 | logger = logging.getLogger(__name__) |
| 102 | |
| 103 | app = typer.Typer() |
| 104 | |
| 105 | EventKind = Literal[ |
| 106 | "created", |
| 107 | "renamed_from", |
| 108 | "moved_from", |
| 109 | "copied_from", |
| 110 | "modified", |
| 111 | "deleted", |
| 112 | ] |
| 113 | |
| 114 | _FUNCTION_KINDS = frozenset({ |
| 115 | "function", "async_function", "method", "async_method", "class", |
| 116 | }) |
| 117 | |
| 118 | |
| 119 | def _read_repo_id(root: pathlib.Path) -> str: |
| 120 | return str(json.loads((root / ".muse" / "repo.json").read_text())["repo_id"]) |
| 121 | |
| 122 | |
| 123 | def _read_branch(root: pathlib.Path) -> str: |
| 124 | head_ref = (root / ".muse" / "HEAD").read_text().strip() |
| 125 | return head_ref.removeprefix("refs/heads/").strip() |
| 126 | |
| 127 | |
| 128 | def _body_hash_for(root: pathlib.Path, manifest: dict[str, str], address: str) -> str | None: |
| 129 | """Return body_hash for *address* in *manifest*, or None if not found.""" |
| 130 | if "::" not in address: |
| 131 | return None |
| 132 | file_path = address.split("::")[0] |
| 133 | obj_id = manifest.get(file_path) |
| 134 | if obj_id is None: |
| 135 | return None |
| 136 | raw = read_object(root, obj_id) |
| 137 | if raw is None: |
| 138 | return None |
| 139 | tree = parse_symbols(raw, file_path) |
| 140 | rec = tree.get(address) |
| 141 | return rec["body_hash"] if rec else None |
| 142 | |
| 143 | |
| 144 | class _LineageEvent: |
| 145 | def __init__( |
| 146 | self, |
| 147 | commit_id: str, |
| 148 | committed_at: str, |
| 149 | kind: EventKind, |
| 150 | detail: str = "", |
| 151 | old_body_hash: str = "", |
| 152 | new_body_hash: str = "", |
| 153 | old_content_id: str = "", |
| 154 | new_content_id: str = "", |
| 155 | ) -> None: |
| 156 | self.commit_id = commit_id |
| 157 | self.committed_at = committed_at |
| 158 | self.kind = kind |
| 159 | self.detail = detail |
| 160 | self.old_body_hash = old_body_hash |
| 161 | self.new_body_hash = new_body_hash |
| 162 | self.old_content_id = old_content_id |
| 163 | self.new_content_id = new_content_id |
| 164 | |
| 165 | def to_dict(self) -> dict[str, str]: |
| 166 | d: dict[str, str] = { |
| 167 | "commit_id": self.commit_id[:8], |
| 168 | "committed_at": self.committed_at, |
| 169 | "event": self.kind, |
| 170 | } |
| 171 | if self.detail: |
| 172 | d["detail"] = self.detail |
| 173 | if self.old_body_hash: |
| 174 | d["old_body_hash"] = self.old_body_hash[:8] |
| 175 | if self.new_body_hash: |
| 176 | d["new_body_hash"] = self.new_body_hash[:8] |
| 177 | if self.old_content_id: |
| 178 | d["old_content_id"] = self.old_content_id[:8] |
| 179 | if self.new_content_id: |
| 180 | d["new_content_id"] = self.new_content_id[:8] |
| 181 | return d |
| 182 | |
| 183 | |
| 184 | def _classify_replace(old_content_id: str, new_content_id: str, |
| 185 | old_summary: str, new_summary: str) -> str: |
| 186 | """Classify a ReplaceOp by examining summary strings for hash markers.""" |
| 187 | if "signature" in old_summary or "signature" in new_summary: |
| 188 | return "signature_change" |
| 189 | if old_content_id[:8] != new_content_id[:8]: |
| 190 | return "full_rewrite" |
| 191 | return "impl_only" |
| 192 | |
| 193 | |
| 194 | def build_lineage( |
| 195 | root: pathlib.Path, |
| 196 | address: str, |
| 197 | ) -> list[_LineageEvent]: |
| 198 | """Walk all commits oldest-first and build the provenance chain.""" |
| 199 | all_commits = sorted( |
| 200 | get_all_commits(root), |
| 201 | key=lambda c: c.committed_at, |
| 202 | ) |
| 203 | |
| 204 | events: list[_LineageEvent] = [] |
| 205 | # Track whether we currently "own" this address (it exists in HEAD at each step). |
| 206 | address_live = False |
| 207 | current_body_hash: str | None = None |
| 208 | |
| 209 | for commit in all_commits: |
| 210 | if commit.structured_delta is None: |
| 211 | continue |
| 212 | ops = commit.structured_delta.get("ops", []) |
| 213 | committed_at = commit.committed_at.isoformat() |
| 214 | short = commit.commit_id[:8] |
| 215 | |
| 216 | # Gather all symbol-level ops for this commit using discriminated access. |
| 217 | inserts: dict[str, _InsertFields] = {} |
| 218 | deletes: dict[str, _DeleteFields] = {} |
| 219 | replaces: dict[str, _ReplaceFields] = {} |
| 220 | |
| 221 | for op in flat_symbol_ops(ops): |
| 222 | addr = op["address"] |
| 223 | if op["op"] == "insert": |
| 224 | inserts[addr] = _InsertFields( |
| 225 | address=addr, |
| 226 | content_id=op["content_id"], |
| 227 | ) |
| 228 | elif op["op"] == "delete": |
| 229 | deletes[addr] = _DeleteFields( |
| 230 | address=addr, |
| 231 | content_id=op["content_id"], |
| 232 | ) |
| 233 | elif op["op"] == "replace": |
| 234 | replaces[addr] = _ReplaceFields( |
| 235 | address=addr, |
| 236 | old_content_id=op["old_content_id"], |
| 237 | new_content_id=op["new_content_id"], |
| 238 | old_summary=op["old_summary"], |
| 239 | new_summary=op["new_summary"], |
| 240 | ) |
| 241 | |
| 242 | if address in replaces: |
| 243 | rep = replaces[address] |
| 244 | old_cid = rep.old_content_id |
| 245 | new_cid = rep.new_content_id |
| 246 | old_sum = rep.old_summary |
| 247 | new_sum = rep.new_summary |
| 248 | detail = _classify_replace(old_cid, new_cid, old_sum, new_sum) |
| 249 | events.append(_LineageEvent( |
| 250 | commit_id=commit.commit_id, |
| 251 | committed_at=committed_at, |
| 252 | kind="modified", |
| 253 | detail=detail, |
| 254 | old_content_id=old_cid, |
| 255 | new_content_id=new_cid, |
| 256 | )) |
| 257 | |
| 258 | if address in inserts: |
| 259 | ins = inserts[address] |
| 260 | ins_cid = ins.content_id |
| 261 | |
| 262 | # Look for a DeleteOp with the same content_id in this commit → |
| 263 | # rename or move detection. |
| 264 | source_addr: str | None = None |
| 265 | for del_addr, del_op in deletes.items(): |
| 266 | if del_addr == address: |
| 267 | continue |
| 268 | if del_op.content_id and del_op.content_id == ins_cid: |
| 269 | source_addr = del_addr |
| 270 | break |
| 271 | |
| 272 | if source_addr is not None: |
| 273 | del_file = source_addr.split("::")[0] |
| 274 | ins_file = address.split("::")[0] |
| 275 | kind: EventKind = "moved_from" if del_file != ins_file else "renamed_from" |
| 276 | events.append(_LineageEvent( |
| 277 | commit_id=commit.commit_id, |
| 278 | committed_at=committed_at, |
| 279 | kind=kind, |
| 280 | detail=source_addr, |
| 281 | new_content_id=ins_cid, |
| 282 | )) |
| 283 | else: |
| 284 | # Check if another live symbol shares the body_hash → copy. |
| 285 | manifest = get_commit_snapshot_manifest(root, commit.commit_id) or {} |
| 286 | ins_body = _body_hash_for(root, manifest, address) |
| 287 | copy_source: str | None = None |
| 288 | if ins_body and not address_live: |
| 289 | # Scan the snapshot for another address with the same body_hash. |
| 290 | for file_path, obj_id in sorted(manifest.items()): |
| 291 | raw = read_object(root, obj_id) |
| 292 | if raw is None: |
| 293 | continue |
| 294 | tree = parse_symbols(raw, file_path) |
| 295 | for other_addr, rec in tree.items(): |
| 296 | if other_addr != address and rec["body_hash"] == ins_body: |
| 297 | copy_source = other_addr |
| 298 | break |
| 299 | if copy_source: |
| 300 | break |
| 301 | |
| 302 | if copy_source: |
| 303 | ev_kind: EventKind = "copied_from" |
| 304 | else: |
| 305 | ev_kind = "created" |
| 306 | events.append(_LineageEvent( |
| 307 | commit_id=commit.commit_id, |
| 308 | committed_at=committed_at, |
| 309 | kind=ev_kind, |
| 310 | detail=copy_source or "", |
| 311 | new_content_id=ins_cid, |
| 312 | )) |
| 313 | address_live = True |
| 314 | current_body_hash = ins_cid |
| 315 | |
| 316 | if address in deletes: |
| 317 | del_f = deletes[address] |
| 318 | events.append(_LineageEvent( |
| 319 | commit_id=commit.commit_id, |
| 320 | committed_at=committed_at, |
| 321 | kind="deleted", |
| 322 | old_content_id=del_f.content_id, |
| 323 | )) |
| 324 | address_live = False |
| 325 | current_body_hash = None |
| 326 | |
| 327 | return events |
| 328 | |
| 329 | |
| 330 | @app.callback(invoke_without_command=True) |
| 331 | def lineage( |
| 332 | ctx: typer.Context, |
| 333 | address: str = typer.Argument( |
| 334 | ..., metavar="ADDRESS", |
| 335 | help='Symbol address, e.g. "src/billing.py::compute_invoice_total".', |
| 336 | ), |
| 337 | ref: str | None = typer.Option( |
| 338 | None, "--commit", "-c", metavar="REF", |
| 339 | help="Walk history from this commit instead of HEAD.", |
| 340 | ), |
| 341 | as_json: bool = typer.Option(False, "--json", help="Emit results as JSON."), |
| 342 | ) -> None: |
| 343 | """Show the full provenance chain of a symbol through commit history. |
| 344 | |
| 345 | Classifies every event as: created, renamed_from, moved_from, copied_from, |
| 346 | modified (impl_only / signature_change / full_rewrite), or deleted. |
| 347 | |
| 348 | Rename and move detection works by matching body hashes across DeleteOp and |
| 349 | InsertOp pairs within the same commit. Copy detection looks for another |
| 350 | living symbol with the same body hash at the time of insertion. |
| 351 | """ |
| 352 | root = require_repo() |
| 353 | events = build_lineage(root, address) |
| 354 | |
| 355 | if as_json: |
| 356 | typer.echo(json.dumps( |
| 357 | { |
| 358 | "address": address, |
| 359 | "events": [e.to_dict() for e in events], |
| 360 | "total": len(events), |
| 361 | }, |
| 362 | indent=2, |
| 363 | )) |
| 364 | return |
| 365 | |
| 366 | typer.echo(f"\nLineage: {address}") |
| 367 | typer.echo("─" * 62) |
| 368 | |
| 369 | if not events: |
| 370 | typer.echo( |
| 371 | "\n (no events found — address may not exist in this repository's history)" |
| 372 | ) |
| 373 | return |
| 374 | |
| 375 | for ev in events: |
| 376 | date = ev.committed_at[:10] |
| 377 | short = ev.commit_id[:8] |
| 378 | label: str = ev.kind |
| 379 | if ev.detail: |
| 380 | label = f"{ev.kind} {ev.detail}" |
| 381 | typer.echo(f" {date} {short} {label}") |
| 382 | |
| 383 | typer.echo() |
| 384 | first = events[0].committed_at[:10] |
| 385 | last = events[-1].committed_at[:10] |
| 386 | typer.echo( |
| 387 | f" {len(events)} event(s) — first seen {first} · last seen {last}" |
| 388 | ) |