cgcardona / muse public
pull.py python
543 lines 21.2 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """muse pull — download remote commits from the configured Muse Hub.
2
3 Pull algorithm
4 --------------
5 1. Resolve repo root and read ``repo_id`` from ``.muse/repo.json``.
6 2. Read current branch from ``.muse/HEAD``.
7 3. Resolve ``origin`` URL from ``.muse/config.toml``.
8 Exits 1 with an instructive message if no remote is configured.
9 4. Collect ``have_commits`` (commit IDs already in local DB) and
10 ``have_objects`` (object IDs already stored) to avoid re-downloading.
11 5. POST to ``<remote_url>/pull`` with Bearer auth.
12 6. Store returned commits and object descriptors in local Postgres.
13 7. Update ``.muse/remotes/origin/<branch>`` tracking pointer.
14 8. Apply post-fetch integration strategy based on flags:
15 - Default: print divergence warning if branches diverged.
16 - ``--ff-only``: fast-forward if possible; fail if not.
17 - ``--rebase``: fast-forward if remote is simply ahead; rebase local
18 commits onto remote HEAD if branches have diverged.
19
20 Flags
21 -----
22 - ``--rebase``: after fetching, rebase local commits on top of the fetched
23 remote HEAD rather than merging. For linear divergence this replays each
24 local commit with the same snapshot but a new parent ID. For complex
25 divergence, it falls back to an advisory error.
26 - ``--ff-only``: after fetching, only integrate if the result would be a
27 fast-forward (remote HEAD is a direct descendant of local HEAD). Fails
28 with exit code 1 if the branches have diverged.
29
30 Exit codes:
31 0 — success (including ff and rebase cases)
32 1 — user error (no remote, bad args, ff-only on diverged branch)
33 2 — not a Muse repository
34 3 — network / server error
35 """
36 from __future__ import annotations
37
38 import asyncio
39 import datetime
40 import json
41 import logging
42 import pathlib
43 from collections.abc import Mapping
44
45 import httpx
46 import typer
47
48 from maestro.muse_cli._repo import require_repo
49 from maestro.muse_cli.config import get_remote, get_remote_head, set_remote_head
50 from maestro.muse_cli.db import (
51 get_all_object_ids,
52 get_commits_for_branch,
53 insert_commit,
54 open_session,
55 store_pulled_commit,
56 store_pulled_object,
57 )
58 from maestro.muse_cli.errors import ExitCode
59 from maestro.muse_cli.hub_client import (
60 MuseHubClient,
61 PullRequest,
62 PullResponse,
63 )
64 from maestro.muse_cli.merge_engine import find_merge_base
65 from maestro.muse_cli.models import MuseCliCommit
66 from maestro.muse_cli.snapshot import compute_commit_tree_id
67
68 logger = logging.getLogger(__name__)
69
70 app = typer.Typer()
71
72 _NO_REMOTE_MSG = (
73 "No remote named 'origin'. "
74 "Run `muse remote add origin <url>` to configure one."
75 )
76
77 _DIVERGED_MSG = (
78 "⚠️ Local branch has diverged from {remote}/{branch}.\n"
79 " Run `muse merge {remote}/{branch}` to integrate remote changes."
80 )
81
82
83 # ---------------------------------------------------------------------------
84 # Rebase helper
85 # ---------------------------------------------------------------------------
86
87
88 async def _rebase_commits_onto(
89 root: pathlib.Path,
90 repo_id: str,
91 branch: str,
92 commits_to_rebase: list[MuseCliCommit],
93 new_base_commit_id: str,
94 ) -> str:
95 """Replay *commits_to_rebase* (oldest-first) on top of *new_base_commit_id*.
96
97 Creates new MuseCliCommit rows with updated parent IDs but the same
98 snapshot, message, and author. Commit IDs are recomputed deterministically
99 via :func:`~maestro.muse_cli.snapshot.compute_commit_tree_id` so that
100 running the same rebase twice does not insert duplicate rows.
101
102 This implements a linear rebase: no conflict detection is performed. When
103 the caller needs path-level conflict handling it should use ``muse merge``
104 instead.
105
106 Args:
107 root: Repository root (for writing the branch ref).
108 repo_id: Repository ID to tag the new commit rows.
109 branch: Local branch name whose HEAD will be updated.
110 commits_to_rebase: Local commits above the merge base, oldest-first.
111 new_base_commit_id: The remote HEAD onto which we replay.
112
113 Returns:
114 The new local branch HEAD commit ID (last replayed commit).
115 """
116 current_parent_id: str = new_base_commit_id
117
118 async with open_session() as session:
119 for commit in commits_to_rebase:
120 new_commit_id = compute_commit_tree_id(
121 parent_ids=[current_parent_id],
122 snapshot_id=commit.snapshot_id,
123 message=commit.message,
124 author=commit.author,
125 )
126
127 # Idempotency: skip if this rebased commit already exists
128 existing = await session.get(MuseCliCommit, new_commit_id)
129 if existing is None:
130 rebased = MuseCliCommit(
131 commit_id=new_commit_id,
132 repo_id=repo_id,
133 branch=branch,
134 parent_commit_id=current_parent_id,
135 snapshot_id=commit.snapshot_id,
136 message=commit.message,
137 author=commit.author,
138 committed_at=datetime.datetime.now(datetime.timezone.utc),
139 commit_metadata=commit.commit_metadata,
140 )
141 await insert_commit(session, rebased)
142
143 current_parent_id = new_commit_id
144
145 await session.commit()
146
147 # Update local branch ref to the last rebased commit
148 ref_path = root / ".muse" / "refs" / "heads" / branch
149 ref_path.parent.mkdir(parents=True, exist_ok=True)
150 ref_path.write_text(current_parent_id, encoding="utf-8")
151
152 return current_parent_id
153
154
155 # ---------------------------------------------------------------------------
156 # Divergence detection
157 # ---------------------------------------------------------------------------
158
159
160 def _is_ancestor(
161 commits_by_id: Mapping[str, MuseCliCommit],
162 ancestor_id: str,
163 descendant_id: str,
164 ) -> bool:
165 """Return True if *ancestor_id* is a reachable ancestor of *descendant_id*.
166
167 Walks the parent chain starting from *descendant_id* and returns ``True``
168 if *ancestor_id* is encountered. Returns ``False`` if the chain ends
169 without finding the candidate (including when either ID is unknown).
170
171 ``commits_by_id`` maps commit_id → MuseCliCommit (or any object with a
172 ``parent_commit_id`` attribute).
173 """
174 if ancestor_id == descendant_id:
175 return True
176 visited: set[str] = set()
177 current_id: str | None = descendant_id
178 while current_id is not None and current_id not in visited:
179 visited.add(current_id)
180 commit = commits_by_id.get(current_id)
181 if commit is None:
182 break
183 parent_raw = getattr(commit, "parent_commit_id", None)
184 current_id = str(parent_raw) if parent_raw is not None else None
185 if current_id == ancestor_id:
186 return True
187 return False
188
189
190 # ---------------------------------------------------------------------------
191 # Async pull core
192 # ---------------------------------------------------------------------------
193
194
195 async def _pull_async(
196 *,
197 root: pathlib.Path,
198 remote_name: str,
199 branch: str | None,
200 rebase: bool = False,
201 ff_only: bool = False,
202 ) -> None:
203 """Execute the pull pipeline.
204
205 Raises :class:`typer.Exit` with the appropriate code on all error paths.
206
207 After fetching remote commits, the post-fetch integration strategy is
208 determined by *rebase* and *ff_only*:
209
210 - Default (both False): print divergence warning when branches have
211 diverged; do not touch the local branch ref.
212 - ``ff_only=True``: fast-forward the local branch ref to remote_head if
213 possible; fail with exit 1 if the branches have diverged.
214 - ``rebase=True``: fast-forward if remote is simply ahead; replay local
215 commits onto remote_head when branches have diverged (linear rebase).
216
217 When both *rebase* and *ff_only* are True, *ff_only* takes precedence.
218 """
219 muse_dir = root / ".muse"
220
221 # ── Repo identity ────────────────────────────────────────────────────
222 repo_data: dict[str, str] = json.loads((muse_dir / "repo.json").read_text())
223 repo_id = repo_data["repo_id"]
224
225 # ── Branch resolution ────────────────────────────────────────────────
226 head_ref = (muse_dir / "HEAD").read_text().strip()
227 effective_branch = branch or head_ref.rsplit("/", 1)[-1]
228
229 # ── Remote URL ───────────────────────────────────────────────────────
230 remote_url = get_remote(remote_name, root)
231 if not remote_url:
232 typer.echo(_NO_REMOTE_MSG)
233 raise typer.Exit(code=int(ExitCode.USER_ERROR))
234
235 # ── Collect have-sets for delta pull ─────────────────────────────────
236 async with open_session() as session:
237 local_commits = await get_commits_for_branch(session, repo_id, effective_branch)
238 have_commits = [c.commit_id for c in local_commits]
239 have_objects = await get_all_object_ids(session, repo_id)
240
241 mode_hint = " (--rebase)" if rebase else " (--ff-only)" if ff_only else ""
242 typer.echo(f"⬇️ Pulling {remote_name}/{effective_branch}{mode_hint} …")
243
244 pull_request = PullRequest(
245 branch=effective_branch,
246 have_commits=have_commits,
247 have_objects=have_objects,
248 )
249 if rebase:
250 pull_request["rebase"] = True
251 if ff_only:
252 pull_request["ff_only"] = True
253
254 # ── HTTP pull ────────────────────────────────────────────────────────
255 try:
256 async with MuseHubClient(base_url=remote_url, repo_root=root) as hub:
257 response = await hub.post("/pull", json=pull_request)
258
259 if response.status_code != 200:
260 typer.echo(
261 f"❌ Hub rejected pull (HTTP {response.status_code}): {response.text}"
262 )
263 logger.error(
264 "❌ muse pull failed: HTTP %d — %s",
265 response.status_code,
266 response.text,
267 )
268 raise typer.Exit(code=int(ExitCode.INTERNAL_ERROR))
269
270 except typer.Exit:
271 raise
272 except httpx.TimeoutException:
273 typer.echo(f"❌ Pull timed out connecting to {remote_url}")
274 raise typer.Exit(code=int(ExitCode.INTERNAL_ERROR))
275 except httpx.HTTPError as exc:
276 typer.echo(f"❌ Network error during pull: {exc}")
277 logger.error("❌ muse pull network error: %s", exc, exc_info=True)
278 raise typer.Exit(code=int(ExitCode.INTERNAL_ERROR))
279
280 # ── Parse response ───────────────────────────────────────────────────
281 raw_body: object = response.json()
282 if not isinstance(raw_body, dict):
283 typer.echo("❌ Hub returned unexpected pull response shape.")
284 raise typer.Exit(code=int(ExitCode.INTERNAL_ERROR))
285
286 raw_remote_head = raw_body.get("remote_head")
287 pull_response = PullResponse(
288 commits=list(raw_body.get("commits", [])),
289 objects=list(raw_body.get("objects", [])),
290 remote_head=str(raw_remote_head) if isinstance(raw_remote_head, str) else None,
291 diverged=bool(raw_body.get("diverged", False)),
292 )
293
294 new_commits_count = 0
295 new_objects_count = 0
296
297 # ── Store pulled data in DB ───────────────────────────────────────────
298 async with open_session() as session:
299 for commit_data in pull_response["commits"]:
300 if isinstance(commit_data, dict):
301 # Inject repo_id since Hub response may omit it
302 commit_data_with_repo = dict(commit_data)
303 commit_data_with_repo.setdefault("repo_id", repo_id)
304 inserted = await store_pulled_commit(session, commit_data_with_repo)
305 if inserted:
306 new_commits_count += 1
307
308 for obj_data in pull_response["objects"]:
309 if isinstance(obj_data, dict):
310 inserted = await store_pulled_object(session, dict(obj_data))
311 if inserted:
312 new_objects_count += 1
313
314 # ── Update remote tracking head ───────────────────────────────────────
315 remote_head_from_hub = pull_response["remote_head"]
316 if remote_head_from_hub:
317 set_remote_head(remote_name, effective_branch, remote_head_from_hub, root)
318
319 # ── Determine local HEAD and divergence ───────────────────────────────
320 ref_path = muse_dir / "refs" / "heads" / effective_branch
321 local_head: str | None = None
322 if ref_path.exists():
323 raw = ref_path.read_text(encoding="utf-8").strip()
324 local_head = raw if raw else None
325
326 diverged = pull_response["diverged"]
327
328 # Re-check divergence locally using the updated commit graph
329 async with open_session() as session:
330 commits_after = await get_commits_for_branch(session, repo_id, effective_branch)
331
332 commits_by_id: dict[str, MuseCliCommit] = {c.commit_id: c for c in commits_after}
333
334 if (
335 not diverged
336 and remote_head_from_hub
337 and local_head
338 and remote_head_from_hub != local_head
339 ):
340 if not _is_ancestor(commits_by_id, remote_head_from_hub, local_head):
341 diverged = True
342
343 # ── Fast-forward check (common to --ff-only and --rebase) ────────────
344 can_fast_forward = (
345 remote_head_from_hub is not None
346 and (
347 local_head is None
348 or _is_ancestor(commits_by_id, local_head, remote_head_from_hub)
349 )
350 )
351
352 # ── Apply post-fetch integration strategy ─────────────────────────────
353 if ff_only:
354 if can_fast_forward and remote_head_from_hub:
355 # Advance local branch ref to remote HEAD
356 ref_path.parent.mkdir(parents=True, exist_ok=True)
357 ref_path.write_text(remote_head_from_hub, encoding="utf-8")
358 typer.echo(
359 f"✅ Fast-forwarded {effective_branch} → {remote_head_from_hub[:8]}"
360 )
361 logger.info(
362 "✅ muse pull --ff-only: fast-forwarded %s → %s",
363 effective_branch,
364 remote_head_from_hub[:8],
365 )
366 elif not diverged and remote_head_from_hub and local_head == remote_head_from_hub:
367 typer.echo(f"✅ Already up to date — {effective_branch} is current.")
368 else:
369 typer.echo(
370 f"❌ Cannot fast-forward: {effective_branch} has diverged from "
371 f"{remote_name}/{effective_branch}. "
372 f"Run `muse merge {remote_name}/{effective_branch}` or use "
373 f"`muse pull --rebase` to integrate."
374 )
375 logger.warning(
376 "⚠️ muse pull --ff-only: branches have diverged, refusing to merge",
377 )
378 raise typer.Exit(code=int(ExitCode.USER_ERROR))
379
380 elif rebase:
381 if can_fast_forward and remote_head_from_hub:
382 # Simple fast-forward — remote is strictly ahead of us
383 ref_path.parent.mkdir(parents=True, exist_ok=True)
384 ref_path.write_text(remote_head_from_hub, encoding="utf-8")
385 typer.echo(
386 f"✅ Fast-forwarded {effective_branch} → {remote_head_from_hub[:8]}"
387 )
388 logger.info(
389 "✅ muse pull --rebase: fast-forwarded %s → %s",
390 effective_branch,
391 remote_head_from_hub[:8],
392 )
393 elif diverged and remote_head_from_hub and local_head:
394 # Diverged — attempt linear rebase
395 async with open_session() as session:
396 merge_base_id = await find_merge_base(
397 session, local_head, remote_head_from_hub
398 )
399
400 if merge_base_id is None:
401 typer.echo(
402 "❌ Cannot rebase: no common ancestor found between "
403 f"{effective_branch} and {remote_name}/{effective_branch}. "
404 "Use `muse merge` instead."
405 )
406 raise typer.Exit(code=int(ExitCode.USER_ERROR))
407
408 # Collect local commits above the merge base (oldest-first)
409 local_above_base: list[MuseCliCommit] = []
410 for c in reversed(commits_after):
411 if c.commit_id == merge_base_id:
412 break
413 # Only include commits that are NOT in the remote history
414 if not _is_ancestor(commits_by_id, c.commit_id, remote_head_from_hub):
415 local_above_base.append(c)
416
417 if not local_above_base:
418 # Remote is already at or past local — fast-forward
419 ref_path.parent.mkdir(parents=True, exist_ok=True)
420 ref_path.write_text(remote_head_from_hub, encoding="utf-8")
421 typer.echo(
422 f"✅ Fast-forwarded {effective_branch} → {remote_head_from_hub[:8]}"
423 )
424 else:
425 typer.echo(
426 f"⟳ Rebasing {len(local_above_base)} local commit(s) onto "
427 f"{remote_head_from_hub[:8]} …"
428 )
429 new_head = await _rebase_commits_onto(
430 root=root,
431 repo_id=repo_id,
432 branch=effective_branch,
433 commits_to_rebase=local_above_base,
434 new_base_commit_id=remote_head_from_hub,
435 )
436 typer.echo(
437 f"✅ Rebase complete — {effective_branch} → {new_head[:8]}"
438 )
439 logger.info(
440 "✅ muse pull --rebase: rebased %d commit(s) onto %s, new HEAD %s",
441 len(local_above_base),
442 remote_head_from_hub[:8],
443 new_head[:8],
444 )
445 elif local_head == remote_head_from_hub:
446 typer.echo(f"✅ Already up to date — {effective_branch} is current.")
447
448 else:
449 # Default: print divergence warning (do not touch local branch ref)
450 if diverged:
451 typer.echo(
452 _DIVERGED_MSG.format(remote=remote_name, branch=effective_branch)
453 )
454
455 typer.echo(
456 f"✅ Pulled {new_commits_count} new commit(s), "
457 f"{new_objects_count} new object(s) from {remote_name}/{effective_branch}"
458 )
459 logger.info(
460 "✅ muse pull %s/%s: +%d commits, +%d objects",
461 remote_name,
462 effective_branch,
463 new_commits_count,
464 new_objects_count,
465 )
466
467
468 # ---------------------------------------------------------------------------
469 # Typer command
470 # ---------------------------------------------------------------------------
471
472
473 @app.callback(invoke_without_command=True)
474 def pull(
475 ctx: typer.Context,
476 branch: str | None = typer.Option(
477 None,
478 "--branch",
479 "-b",
480 help="Branch to pull. Defaults to the current branch.",
481 ),
482 remote: str = typer.Option(
483 "origin",
484 "--remote",
485 help="Remote name to pull from.",
486 ),
487 rebase: bool = typer.Option(
488 False,
489 "--rebase",
490 help=(
491 "After fetching, rebase local commits on top of the remote HEAD "
492 "instead of merging. For a simple case where remote is ahead, this "
493 "fast-forwards the local branch. For diverged branches, each local "
494 "commit above the merge base is replayed with the remote HEAD as "
495 "the new base, preserving a linear history."
496 ),
497 ),
498 ff_only: bool = typer.Option(
499 False,
500 "--ff-only",
501 help=(
502 "Refuse to integrate remote commits unless the result would be a "
503 "fast-forward (i.e. local branch is a direct ancestor of the remote "
504 "HEAD). Exits 1 with an instructive message when branches have "
505 "diverged, keeping the local branch unchanged."
506 ),
507 ),
508 ) -> None:
509 """Download commits from the remote Muse Hub into the local repository.
510
511 Contacts the remote Hub, receives commits and objects that are not yet in
512 the local database, and stores them. Post-fetch integration depends on flags:
513
514 - Default: warn if diverged, suggest ``muse merge``.
515 - ``--ff-only``: fast-forward or fail.
516 - ``--rebase``: rebase local commits onto remote HEAD.
517
518 Example::
519
520 muse pull
521 muse pull --rebase
522 muse pull --ff-only
523 muse pull --branch feature/groove-v2
524 muse pull --remote staging
525 """
526 root = require_repo()
527
528 try:
529 asyncio.run(
530 _pull_async(
531 root=root,
532 remote_name=remote,
533 branch=branch,
534 rebase=rebase,
535 ff_only=ff_only,
536 )
537 )
538 except typer.Exit:
539 raise
540 except Exception as exc:
541 typer.echo(f"❌ muse pull failed: {exc}")
542 logger.error("❌ muse pull unexpected error: %s", exc, exc_info=True)
543 raise typer.Exit(code=int(ExitCode.INTERNAL_ERROR))