rev_parse.py
python
| 1 | """muse rev-parse — resolve a revision expression to a commit ID. |
| 2 | |
| 3 | Translates a symbolic revision expression into a concrete commit ID, mirroring |
| 4 | ``git rev-parse`` semantics for the Muse VCS. Designed to be used both |
| 5 | interactively and as a plumbing primitive that other commands can call |
| 6 | internally to resolve user-supplied refs. |
| 7 | |
| 8 | Supported revision expressions |
| 9 | -------------------------------- |
| 10 | - ``HEAD`` — current branch tip |
| 11 | - ``<branch>`` — tip of a named branch |
| 12 | - ``<commit_id>`` — full or abbreviated (prefix) commit ID |
| 13 | - ``HEAD~N`` — N parents back from HEAD |
| 14 | - ``<branch>~N`` — N parents back from branch tip |
| 15 | |
| 16 | Flags |
| 17 | ------ |
| 18 | - ``--short`` — print only the first 8 characters of the resolved ID |
| 19 | - ``--verify`` — exit 1 when the expression does not resolve (default: |
| 20 | print nothing and exit 0) |
| 21 | - ``--abbrev-ref`` — print the branch name rather than the commit ID |
| 22 | (meaningful for HEAD and branch refs; for a raw commit ID |
| 23 | the branch of that commit is printed) |
| 24 | |
| 25 | Result type: ``RevParseResult`` — see ``docs/reference/type_contracts.md``. |
| 26 | """ |
| 27 | from __future__ import annotations |
| 28 | |
| 29 | import asyncio |
| 30 | import json |
| 31 | import logging |
| 32 | import pathlib |
| 33 | import re |
| 34 | |
| 35 | import typer |
| 36 | from sqlalchemy.ext.asyncio import AsyncSession |
| 37 | from sqlalchemy.future import select |
| 38 | |
| 39 | from maestro.muse_cli._repo import require_repo |
| 40 | from maestro.muse_cli.db import open_session |
| 41 | from maestro.muse_cli.errors import ExitCode |
| 42 | from maestro.muse_cli.models import MuseCliCommit |
| 43 | |
| 44 | logger = logging.getLogger(__name__) |
| 45 | |
| 46 | app = typer.Typer() |
| 47 | |
| 48 | # Regex: "HEAD~3", "main~1", "a1b2c3~2" etc. |
| 49 | _TILDE_RE = re.compile(r"^(.+?)~(\d+)$") |
| 50 | |
| 51 | |
| 52 | # --------------------------------------------------------------------------- |
| 53 | # Named result type (registered in docs/reference/type_contracts.md) |
| 54 | # --------------------------------------------------------------------------- |
| 55 | |
| 56 | |
| 57 | class RevParseResult: |
| 58 | """Resolved output of a revision expression. |
| 59 | |
| 60 | Returned by ``_resolve_revision`` so that callers have access to both the |
| 61 | full commit ID and its branch without re-querying the database. Treat as |
| 62 | an immutable value object — all fields are set in ``__init__`` and never |
| 63 | mutated. |
| 64 | |
| 65 | Fields |
| 66 | ------ |
| 67 | commit_id : str |
| 68 | Full 64-character hex commit ID. |
| 69 | branch : str |
| 70 | Branch that the commit lives on (may differ from the expression when a |
| 71 | raw commit ID spanning multiple branches is resolved). |
| 72 | revision_expr : str |
| 73 | The original expression that was resolved (useful for error messages). |
| 74 | """ |
| 75 | |
| 76 | __slots__ = ("commit_id", "branch", "revision_expr") |
| 77 | |
| 78 | def __init__(self, commit_id: str, branch: str, revision_expr: str) -> None: |
| 79 | self.commit_id = commit_id |
| 80 | self.branch = branch |
| 81 | self.revision_expr = revision_expr |
| 82 | |
| 83 | def __repr__(self) -> str: |
| 84 | return ( |
| 85 | f"RevParseResult(commit_id={self.commit_id[:8]!r}," |
| 86 | f" branch={self.branch!r}," |
| 87 | f" revision_expr={self.revision_expr!r})" |
| 88 | ) |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # Testable async core |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | |
| 96 | async def _get_branch_tip( |
| 97 | session: AsyncSession, |
| 98 | repo_id: str, |
| 99 | branch: str, |
| 100 | ) -> MuseCliCommit | None: |
| 101 | """Return the most-recent commit on *branch*, or ``None`` if none exist.""" |
| 102 | result = await session.execute( |
| 103 | select(MuseCliCommit) |
| 104 | .where(MuseCliCommit.repo_id == repo_id, MuseCliCommit.branch == branch) |
| 105 | .order_by(MuseCliCommit.committed_at.desc()) |
| 106 | .limit(1) |
| 107 | ) |
| 108 | return result.scalar_one_or_none() |
| 109 | |
| 110 | |
| 111 | async def _resolve_commit_by_id( |
| 112 | session: AsyncSession, |
| 113 | repo_id: str, |
| 114 | ref: str, |
| 115 | ) -> MuseCliCommit | None: |
| 116 | """Resolve *ref* as an exact or prefix-matched commit ID. |
| 117 | |
| 118 | Tries an exact primary-key lookup first; falls back to a prefix scan |
| 119 | (acceptable for CLI latency — commit tables are shallow in typical usage). |
| 120 | """ |
| 121 | commit = await session.get(MuseCliCommit, ref) |
| 122 | if commit is not None: |
| 123 | return commit |
| 124 | # Abbreviated prefix match |
| 125 | result = await session.execute( |
| 126 | select(MuseCliCommit).where( |
| 127 | MuseCliCommit.repo_id == repo_id, |
| 128 | MuseCliCommit.commit_id.startswith(ref), |
| 129 | ) |
| 130 | ) |
| 131 | return result.scalars().first() |
| 132 | |
| 133 | |
| 134 | async def _walk_parents( |
| 135 | session: AsyncSession, |
| 136 | start: MuseCliCommit, |
| 137 | steps: int, |
| 138 | ) -> MuseCliCommit | None: |
| 139 | """Walk *steps* parent hops from *start*, returning the ancestor or None. |
| 140 | |
| 141 | ``steps=0`` returns *start* unchanged. Each step follows |
| 142 | ``parent_commit_id``; if a parent is missing from the DB the walk stops |
| 143 | and ``None`` is returned. |
| 144 | """ |
| 145 | current: MuseCliCommit = start |
| 146 | for _ in range(steps): |
| 147 | if current.parent_commit_id is None: |
| 148 | logger.debug("⚠️ Parent chain exhausted after %d step(s)", steps) |
| 149 | return None |
| 150 | parent = await session.get(MuseCliCommit, current.parent_commit_id) |
| 151 | if parent is None: |
| 152 | logger.warning( |
| 153 | "⚠️ Parent commit %s not found in DB — chain broken", |
| 154 | current.parent_commit_id[:8], |
| 155 | ) |
| 156 | return None |
| 157 | current = parent |
| 158 | return current |
| 159 | |
| 160 | |
| 161 | async def _branch_exists_on_disk(muse_dir: pathlib.Path, name: str) -> bool: |
| 162 | """Return True when a ref file exists for *name* under ``.muse/refs/heads/``.""" |
| 163 | return (muse_dir / "refs" / "heads" / name).exists() |
| 164 | |
| 165 | |
| 166 | async def resolve_revision( |
| 167 | session: AsyncSession, |
| 168 | repo_id: str, |
| 169 | current_branch: str, |
| 170 | muse_dir: pathlib.Path, |
| 171 | revision_expr: str, |
| 172 | ) -> RevParseResult | None: |
| 173 | """Resolve *revision_expr* to a ``RevParseResult``, or return ``None``. |
| 174 | |
| 175 | This is the public plumbing primitive used by ``muse rev-parse`` and |
| 176 | intended for reuse by other commands that accept revision arguments. |
| 177 | |
| 178 | Resolution order |
| 179 | ---------------- |
| 180 | 1. Strip a ``~N`` suffix and record *steps*. |
| 181 | 2. Resolve the base token: |
| 182 | a. ``HEAD`` → tip of *current_branch* |
| 183 | b. Named branch (ref file exists) → tip of that branch |
| 184 | c. Commit ID / prefix → exact or prefix match |
| 185 | 3. Walk *steps* parent hops from the resolved base. |
| 186 | 4. Return ``RevParseResult`` or ``None`` when unresolvable. |
| 187 | """ |
| 188 | # Step 1 — parse tilde suffix |
| 189 | steps = 0 |
| 190 | base = revision_expr |
| 191 | m = _TILDE_RE.match(revision_expr) |
| 192 | if m: |
| 193 | base = m.group(1) |
| 194 | steps = int(m.group(2)) |
| 195 | |
| 196 | # Step 2 — resolve base token to a commit |
| 197 | commit: MuseCliCommit | None = None |
| 198 | |
| 199 | if base.upper() == "HEAD": |
| 200 | commit = await _get_branch_tip(session, repo_id, current_branch) |
| 201 | resolved_branch = current_branch |
| 202 | elif await _branch_exists_on_disk(muse_dir, base): |
| 203 | commit = await _get_branch_tip(session, repo_id, base) |
| 204 | resolved_branch = base |
| 205 | else: |
| 206 | commit = await _resolve_commit_by_id(session, repo_id, base) |
| 207 | resolved_branch = commit.branch if commit is not None else "" |
| 208 | |
| 209 | if commit is None: |
| 210 | return None |
| 211 | |
| 212 | # Step 3 — walk parent chain |
| 213 | ancestor = await _walk_parents(session, commit, steps) |
| 214 | if ancestor is None: |
| 215 | return None |
| 216 | |
| 217 | return RevParseResult( |
| 218 | commit_id=ancestor.commit_id, |
| 219 | branch=ancestor.branch, |
| 220 | revision_expr=revision_expr, |
| 221 | ) |
| 222 | |
| 223 | |
| 224 | async def _rev_parse_async( |
| 225 | *, |
| 226 | root: pathlib.Path, |
| 227 | session: AsyncSession, |
| 228 | revision: str, |
| 229 | short: bool, |
| 230 | verify: bool, |
| 231 | abbrev_ref: bool, |
| 232 | ) -> None: |
| 233 | """Core rev-parse logic — fully injectable for tests. |
| 234 | |
| 235 | Reads repo state from ``.muse/``, resolves *revision*, and writes output |
| 236 | via ``typer.echo``. Raises ``typer.Exit`` on resolution failure when |
| 237 | ``--verify`` is set. |
| 238 | """ |
| 239 | muse_dir = root / ".muse" |
| 240 | repo_data: dict[str, str] = json.loads((muse_dir / "repo.json").read_text()) |
| 241 | repo_id = repo_data["repo_id"] |
| 242 | |
| 243 | head_ref = (muse_dir / "HEAD").read_text().strip() # "refs/heads/main" |
| 244 | current_branch = head_ref.rsplit("/", 1)[-1] # "main" |
| 245 | |
| 246 | result = await resolve_revision( |
| 247 | session=session, |
| 248 | repo_id=repo_id, |
| 249 | current_branch=current_branch, |
| 250 | muse_dir=muse_dir, |
| 251 | revision_expr=revision, |
| 252 | ) |
| 253 | |
| 254 | if result is None: |
| 255 | if verify: |
| 256 | typer.echo(f"fatal: Not a valid revision: {revision!r}", err=True) |
| 257 | raise typer.Exit(code=ExitCode.USER_ERROR) |
| 258 | # --verify not set: print nothing, exit 0 |
| 259 | return |
| 260 | |
| 261 | if abbrev_ref: |
| 262 | typer.echo(result.branch) |
| 263 | elif short: |
| 264 | typer.echo(result.commit_id[:8]) |
| 265 | else: |
| 266 | typer.echo(result.commit_id) |
| 267 | |
| 268 | |
| 269 | # --------------------------------------------------------------------------- |
| 270 | # Typer command |
| 271 | # --------------------------------------------------------------------------- |
| 272 | |
| 273 | |
| 274 | @app.callback(invoke_without_command=True) |
| 275 | def rev_parse( |
| 276 | ctx: typer.Context, |
| 277 | revision: str = typer.Argument(..., help="Revision expression to resolve."), |
| 278 | short: bool = typer.Option( |
| 279 | False, |
| 280 | "--short", |
| 281 | help="Print only the first 8 characters of the commit ID.", |
| 282 | ), |
| 283 | verify: bool = typer.Option( |
| 284 | False, |
| 285 | "--verify", |
| 286 | help="Exit 1 if the revision does not resolve (default: print nothing).", |
| 287 | ), |
| 288 | abbrev_ref: bool = typer.Option( |
| 289 | False, |
| 290 | "--abbrev-ref", |
| 291 | help="Print the branch name instead of the commit ID.", |
| 292 | ), |
| 293 | ) -> None: |
| 294 | """Resolve a revision expression to a commit ID. |
| 295 | |
| 296 | Examples:: |
| 297 | |
| 298 | muse rev-parse HEAD |
| 299 | muse rev-parse HEAD~2 |
| 300 | muse rev-parse --short HEAD |
| 301 | muse rev-parse --abbrev-ref HEAD |
| 302 | muse rev-parse --verify nonexistent |
| 303 | """ |
| 304 | root = require_repo() |
| 305 | |
| 306 | async def _run() -> None: |
| 307 | async with open_session() as session: |
| 308 | await _rev_parse_async( |
| 309 | root=root, |
| 310 | session=session, |
| 311 | revision=revision, |
| 312 | short=short, |
| 313 | verify=verify, |
| 314 | abbrev_ref=abbrev_ref, |
| 315 | ) |
| 316 | |
| 317 | try: |
| 318 | asyncio.run(_run()) |
| 319 | except typer.Exit: |
| 320 | raise |
| 321 | except Exception as exc: |
| 322 | typer.echo(f"❌ muse rev-parse failed: {exc}") |
| 323 | logger.error("❌ muse rev-parse error: %s", exc, exc_info=True) |
| 324 | raise typer.Exit(code=ExitCode.INTERNAL_ERROR) |