cgcardona / muse public
rev_parse.py python
324 lines 10.1 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """muse rev-parse — resolve a revision expression to a commit ID.
2
3 Translates a symbolic revision expression into a concrete commit ID, mirroring
4 ``git rev-parse`` semantics for the Muse VCS. Designed to be used both
5 interactively and as a plumbing primitive that other commands can call
6 internally to resolve user-supplied refs.
7
8 Supported revision expressions
9 --------------------------------
10 - ``HEAD`` — current branch tip
11 - ``<branch>`` — tip of a named branch
12 - ``<commit_id>`` — full or abbreviated (prefix) commit ID
13 - ``HEAD~N`` — N parents back from HEAD
14 - ``<branch>~N`` — N parents back from branch tip
15
16 Flags
17 ------
18 - ``--short`` — print only the first 8 characters of the resolved ID
19 - ``--verify`` — exit 1 when the expression does not resolve (default:
20 print nothing and exit 0)
21 - ``--abbrev-ref`` — print the branch name rather than the commit ID
22 (meaningful for HEAD and branch refs; for a raw commit ID
23 the branch of that commit is printed)
24
25 Result type: ``RevParseResult`` — see ``docs/reference/type_contracts.md``.
26 """
27 from __future__ import annotations
28
29 import asyncio
30 import json
31 import logging
32 import pathlib
33 import re
34
35 import typer
36 from sqlalchemy.ext.asyncio import AsyncSession
37 from sqlalchemy.future import select
38
39 from maestro.muse_cli._repo import require_repo
40 from maestro.muse_cli.db import open_session
41 from maestro.muse_cli.errors import ExitCode
42 from maestro.muse_cli.models import MuseCliCommit
43
44 logger = logging.getLogger(__name__)
45
46 app = typer.Typer()
47
48 # Regex: "HEAD~3", "main~1", "a1b2c3~2" etc.
49 _TILDE_RE = re.compile(r"^(.+?)~(\d+)$")
50
51
52 # ---------------------------------------------------------------------------
53 # Named result type (registered in docs/reference/type_contracts.md)
54 # ---------------------------------------------------------------------------
55
56
57 class RevParseResult:
58 """Resolved output of a revision expression.
59
60 Returned by ``_resolve_revision`` so that callers have access to both the
61 full commit ID and its branch without re-querying the database. Treat as
62 an immutable value object — all fields are set in ``__init__`` and never
63 mutated.
64
65 Fields
66 ------
67 commit_id : str
68 Full 64-character hex commit ID.
69 branch : str
70 Branch that the commit lives on (may differ from the expression when a
71 raw commit ID spanning multiple branches is resolved).
72 revision_expr : str
73 The original expression that was resolved (useful for error messages).
74 """
75
76 __slots__ = ("commit_id", "branch", "revision_expr")
77
78 def __init__(self, commit_id: str, branch: str, revision_expr: str) -> None:
79 self.commit_id = commit_id
80 self.branch = branch
81 self.revision_expr = revision_expr
82
83 def __repr__(self) -> str:
84 return (
85 f"RevParseResult(commit_id={self.commit_id[:8]!r},"
86 f" branch={self.branch!r},"
87 f" revision_expr={self.revision_expr!r})"
88 )
89
90
91 # ---------------------------------------------------------------------------
92 # Testable async core
93 # ---------------------------------------------------------------------------
94
95
96 async def _get_branch_tip(
97 session: AsyncSession,
98 repo_id: str,
99 branch: str,
100 ) -> MuseCliCommit | None:
101 """Return the most-recent commit on *branch*, or ``None`` if none exist."""
102 result = await session.execute(
103 select(MuseCliCommit)
104 .where(MuseCliCommit.repo_id == repo_id, MuseCliCommit.branch == branch)
105 .order_by(MuseCliCommit.committed_at.desc())
106 .limit(1)
107 )
108 return result.scalar_one_or_none()
109
110
111 async def _resolve_commit_by_id(
112 session: AsyncSession,
113 repo_id: str,
114 ref: str,
115 ) -> MuseCliCommit | None:
116 """Resolve *ref* as an exact or prefix-matched commit ID.
117
118 Tries an exact primary-key lookup first; falls back to a prefix scan
119 (acceptable for CLI latency — commit tables are shallow in typical usage).
120 """
121 commit = await session.get(MuseCliCommit, ref)
122 if commit is not None:
123 return commit
124 # Abbreviated prefix match
125 result = await session.execute(
126 select(MuseCliCommit).where(
127 MuseCliCommit.repo_id == repo_id,
128 MuseCliCommit.commit_id.startswith(ref),
129 )
130 )
131 return result.scalars().first()
132
133
134 async def _walk_parents(
135 session: AsyncSession,
136 start: MuseCliCommit,
137 steps: int,
138 ) -> MuseCliCommit | None:
139 """Walk *steps* parent hops from *start*, returning the ancestor or None.
140
141 ``steps=0`` returns *start* unchanged. Each step follows
142 ``parent_commit_id``; if a parent is missing from the DB the walk stops
143 and ``None`` is returned.
144 """
145 current: MuseCliCommit = start
146 for _ in range(steps):
147 if current.parent_commit_id is None:
148 logger.debug("⚠️ Parent chain exhausted after %d step(s)", steps)
149 return None
150 parent = await session.get(MuseCliCommit, current.parent_commit_id)
151 if parent is None:
152 logger.warning(
153 "⚠️ Parent commit %s not found in DB — chain broken",
154 current.parent_commit_id[:8],
155 )
156 return None
157 current = parent
158 return current
159
160
161 async def _branch_exists_on_disk(muse_dir: pathlib.Path, name: str) -> bool:
162 """Return True when a ref file exists for *name* under ``.muse/refs/heads/``."""
163 return (muse_dir / "refs" / "heads" / name).exists()
164
165
166 async def resolve_revision(
167 session: AsyncSession,
168 repo_id: str,
169 current_branch: str,
170 muse_dir: pathlib.Path,
171 revision_expr: str,
172 ) -> RevParseResult | None:
173 """Resolve *revision_expr* to a ``RevParseResult``, or return ``None``.
174
175 This is the public plumbing primitive used by ``muse rev-parse`` and
176 intended for reuse by other commands that accept revision arguments.
177
178 Resolution order
179 ----------------
180 1. Strip a ``~N`` suffix and record *steps*.
181 2. Resolve the base token:
182 a. ``HEAD`` → tip of *current_branch*
183 b. Named branch (ref file exists) → tip of that branch
184 c. Commit ID / prefix → exact or prefix match
185 3. Walk *steps* parent hops from the resolved base.
186 4. Return ``RevParseResult`` or ``None`` when unresolvable.
187 """
188 # Step 1 — parse tilde suffix
189 steps = 0
190 base = revision_expr
191 m = _TILDE_RE.match(revision_expr)
192 if m:
193 base = m.group(1)
194 steps = int(m.group(2))
195
196 # Step 2 — resolve base token to a commit
197 commit: MuseCliCommit | None = None
198
199 if base.upper() == "HEAD":
200 commit = await _get_branch_tip(session, repo_id, current_branch)
201 resolved_branch = current_branch
202 elif await _branch_exists_on_disk(muse_dir, base):
203 commit = await _get_branch_tip(session, repo_id, base)
204 resolved_branch = base
205 else:
206 commit = await _resolve_commit_by_id(session, repo_id, base)
207 resolved_branch = commit.branch if commit is not None else ""
208
209 if commit is None:
210 return None
211
212 # Step 3 — walk parent chain
213 ancestor = await _walk_parents(session, commit, steps)
214 if ancestor is None:
215 return None
216
217 return RevParseResult(
218 commit_id=ancestor.commit_id,
219 branch=ancestor.branch,
220 revision_expr=revision_expr,
221 )
222
223
224 async def _rev_parse_async(
225 *,
226 root: pathlib.Path,
227 session: AsyncSession,
228 revision: str,
229 short: bool,
230 verify: bool,
231 abbrev_ref: bool,
232 ) -> None:
233 """Core rev-parse logic — fully injectable for tests.
234
235 Reads repo state from ``.muse/``, resolves *revision*, and writes output
236 via ``typer.echo``. Raises ``typer.Exit`` on resolution failure when
237 ``--verify`` is set.
238 """
239 muse_dir = root / ".muse"
240 repo_data: dict[str, str] = json.loads((muse_dir / "repo.json").read_text())
241 repo_id = repo_data["repo_id"]
242
243 head_ref = (muse_dir / "HEAD").read_text().strip() # "refs/heads/main"
244 current_branch = head_ref.rsplit("/", 1)[-1] # "main"
245
246 result = await resolve_revision(
247 session=session,
248 repo_id=repo_id,
249 current_branch=current_branch,
250 muse_dir=muse_dir,
251 revision_expr=revision,
252 )
253
254 if result is None:
255 if verify:
256 typer.echo(f"fatal: Not a valid revision: {revision!r}", err=True)
257 raise typer.Exit(code=ExitCode.USER_ERROR)
258 # --verify not set: print nothing, exit 0
259 return
260
261 if abbrev_ref:
262 typer.echo(result.branch)
263 elif short:
264 typer.echo(result.commit_id[:8])
265 else:
266 typer.echo(result.commit_id)
267
268
269 # ---------------------------------------------------------------------------
270 # Typer command
271 # ---------------------------------------------------------------------------
272
273
274 @app.callback(invoke_without_command=True)
275 def rev_parse(
276 ctx: typer.Context,
277 revision: str = typer.Argument(..., help="Revision expression to resolve."),
278 short: bool = typer.Option(
279 False,
280 "--short",
281 help="Print only the first 8 characters of the commit ID.",
282 ),
283 verify: bool = typer.Option(
284 False,
285 "--verify",
286 help="Exit 1 if the revision does not resolve (default: print nothing).",
287 ),
288 abbrev_ref: bool = typer.Option(
289 False,
290 "--abbrev-ref",
291 help="Print the branch name instead of the commit ID.",
292 ),
293 ) -> None:
294 """Resolve a revision expression to a commit ID.
295
296 Examples::
297
298 muse rev-parse HEAD
299 muse rev-parse HEAD~2
300 muse rev-parse --short HEAD
301 muse rev-parse --abbrev-ref HEAD
302 muse rev-parse --verify nonexistent
303 """
304 root = require_repo()
305
306 async def _run() -> None:
307 async with open_session() as session:
308 await _rev_parse_async(
309 root=root,
310 session=session,
311 revision=revision,
312 short=short,
313 verify=verify,
314 abbrev_ref=abbrev_ref,
315 )
316
317 try:
318 asyncio.run(_run())
319 except typer.Exit:
320 raise
321 except Exception as exc:
322 typer.echo(f"❌ muse rev-parse failed: {exc}")
323 logger.error("❌ muse rev-parse error: %s", exc, exc_info=True)
324 raise typer.Exit(code=ExitCode.INTERNAL_ERROR)