cgcardona / muse public
test_muse_rebase.py python
700 lines 23.7 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for ``muse rebase`` — commit replay onto a new base.
2
3 Verifies:
4 - test_rebase_linear_replays_commits — regression: linear rebase replays commits onto upstream tip
5 - test_rebase_noop_already_up_to_date — noop when branch is already at upstream
6 - test_rebase_fast_forward_advances_pointer — current branch behind upstream → fast-forward
7 - test_rebase_already_ahead_noop — upstream is base (current ahead) → noop
8 - test_rebase_interactive_plan_from_commits — InteractivePlan.from_commits produces pick entries
9 - test_rebase_interactive_plan_parse_drop — drop entries are excluded from resolved list
10 - test_rebase_interactive_plan_invalid_action — unrecognised action raises ValueError
11 - test_rebase_interactive_plan_ambiguous_sha — ambiguous SHA prefix raises ValueError
12 - test_rebase_autosquash_moves_fixup_commits — fixup! commits reordered after their targets
13 - test_rebase_autosquash_no_fixups — no fixup! commits → list unchanged
14 - test_rebase_compute_delta_additions — compute_delta detects added paths
15 - test_rebase_compute_delta_deletions — compute_delta detects removed paths
16 - test_rebase_compute_delta_modifications — compute_delta detects modified paths
17 - test_rebase_apply_delta_applies_changes — apply_delta patches an onto manifest
18 - test_rebase_collect_commits_since_base — collects only commits beyond the base
19 - test_rebase_abort_restores_branch — --abort rewrites branch pointer and clears state
20 - test_rebase_continue_replays_remaining — --continue replays remaining commits
21 - test_rebase_continue_no_state_errors — --continue with no state file exits 1
22 - test_rebase_abort_no_state_errors — --abort with no state file exits 1
23 - test_rebase_state_roundtrip — write/read roundtrip for REBASE_STATE.json
24 - test_boundary_no_forbidden_imports — AST boundary seal
25 """
26 from __future__ import annotations
27
28 import ast
29 import datetime
30 import json
31 import pathlib
32 import uuid
33 from collections.abc import AsyncGenerator
34
35 import pytest
36 import typer
37 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
38
39 from maestro.db.database import Base
40 from maestro.muse_cli import models as _cli_models # noqa: F401 — register tables
41 from maestro.muse_cli.db import insert_commit, upsert_snapshot
42 from maestro.muse_cli.models import MuseCliCommit, MuseCliSnapshot
43 from maestro.muse_cli.snapshot import compute_commit_id, compute_snapshot_id
44 from maestro.services.muse_rebase import (
45 InteractivePlan,
46 RebaseResult,
47 RebaseState,
48 apply_autosquash,
49 apply_delta,
50 clear_rebase_state,
51 compute_delta,
52 read_rebase_state,
53 write_rebase_state,
54 _collect_branch_commits_since_base,
55 _rebase_abort_async,
56 _rebase_async,
57 _rebase_continue_async,
58 )
59
60
61 # ---------------------------------------------------------------------------
62 # Fixtures
63 # ---------------------------------------------------------------------------
64
65
66 @pytest.fixture
67 async def async_session() -> AsyncGenerator[AsyncSession, None]:
68 """In-memory SQLite session with all CLI tables created."""
69 engine = create_async_engine("sqlite+aiosqlite:///:memory:")
70 async with engine.begin() as conn:
71 await conn.run_sync(Base.metadata.create_all)
72 Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
73 async with Session() as session:
74 yield session
75 await engine.dispose()
76
77
78 @pytest.fixture
79 def repo_id() -> str:
80 return str(uuid.uuid4())
81
82
83 @pytest.fixture
84 def repo_root(tmp_path: pathlib.Path, repo_id: str) -> pathlib.Path:
85 """Minimal Muse repository structure."""
86 muse_dir = tmp_path / ".muse"
87 muse_dir.mkdir()
88 (muse_dir / "HEAD").write_text("refs/heads/main")
89 (muse_dir / "refs" / "heads").mkdir(parents=True)
90 (muse_dir / "refs" / "heads" / "main").write_text("")
91 (muse_dir / "repo.json").write_text(json.dumps({"repo_id": repo_id}))
92 return tmp_path
93
94
95 # ---------------------------------------------------------------------------
96 # Helpers
97 # ---------------------------------------------------------------------------
98
99
100 def _make_commit(
101 repo_id: str,
102 branch: str,
103 message: str,
104 snapshot_id: str,
105 parent_id: str | None = None,
106 ) -> MuseCliCommit:
107 """Build a MuseCliCommit with a deterministic commit_id."""
108 committed_at = datetime.datetime.now(datetime.timezone.utc)
109 commit_id = compute_commit_id(
110 parent_ids=[parent_id] if parent_id else [],
111 snapshot_id=snapshot_id,
112 message=message,
113 committed_at_iso=committed_at.isoformat(),
114 )
115 return MuseCliCommit(
116 commit_id=commit_id,
117 repo_id=repo_id,
118 branch=branch,
119 parent_commit_id=parent_id,
120 snapshot_id=snapshot_id,
121 message=message,
122 author="test",
123 committed_at=committed_at,
124 )
125
126
127 async def _seed_commit(
128 session: AsyncSession,
129 repo_id: str,
130 branch: str,
131 message: str,
132 manifest: dict[str, str],
133 parent_id: str | None = None,
134 ) -> MuseCliCommit:
135 """Persist a snapshot + commit and return the commit."""
136 snap_id = compute_snapshot_id(manifest)
137 await upsert_snapshot(session, manifest=manifest, snapshot_id=snap_id)
138 commit = _make_commit(repo_id, branch, message, snap_id, parent_id)
139 await insert_commit(session, commit)
140 await session.flush()
141 return commit
142
143
144 # ---------------------------------------------------------------------------
145 # Pure function tests
146 # ---------------------------------------------------------------------------
147
148
149 def test_rebase_compute_delta_additions() -> None:
150 """compute_delta detects paths added in the commit."""
151 parent: dict[str, str] = {"a.mid": "aaa"}
152 commit: dict[str, str] = {"a.mid": "aaa", "b.mid": "bbb"}
153 adds, dels = compute_delta(parent, commit)
154 assert adds == {"b.mid": "bbb"}
155 assert dels == set()
156
157
158 def test_rebase_compute_delta_deletions() -> None:
159 """compute_delta detects paths removed in the commit."""
160 parent: dict[str, str] = {"a.mid": "aaa", "b.mid": "bbb"}
161 commit: dict[str, str] = {"a.mid": "aaa"}
162 adds, dels = compute_delta(parent, commit)
163 assert adds == {}
164 assert dels == {"b.mid"}
165
166
167 def test_rebase_compute_delta_modifications() -> None:
168 """compute_delta detects modified paths."""
169 parent: dict[str, str] = {"a.mid": "old"}
170 commit: dict[str, str] = {"a.mid": "new"}
171 adds, dels = compute_delta(parent, commit)
172 assert adds == {"a.mid": "new"}
173 assert dels == set()
174
175
176 def test_rebase_apply_delta_applies_changes() -> None:
177 """apply_delta correctly patches the onto manifest."""
178 onto: dict[str, str] = {"base.mid": "base", "old.mid": "old"}
179 adds: dict[str, str] = {"new.mid": "new", "old.mid": "updated"}
180 dels: set[str] = {"base.mid"}
181 result = apply_delta(onto, adds, dels)
182 assert result == {"old.mid": "updated", "new.mid": "new"}
183
184
185 def test_rebase_autosquash_moves_fixup_commits() -> None:
186 """apply_autosquash moves fixup! commits after their target."""
187 committed_at = datetime.datetime.now(datetime.timezone.utc)
188
189 def _c(msg: str) -> MuseCliCommit:
190 return MuseCliCommit(
191 commit_id=str(uuid.uuid4()),
192 repo_id="repo",
193 branch="main",
194 parent_commit_id=None,
195 snapshot_id="snap",
196 message=msg,
197 author="",
198 committed_at=committed_at,
199 )
200
201 target = _c("Add drums")
202 fixup = _c("fixup! Add drums")
203 other = _c("Add bass")
204
205 commits = [target, other, fixup]
206 result, was_reordered = apply_autosquash(commits)
207 assert was_reordered is True
208 # fixup should appear right after target
209 idx_target = next(i for i, c in enumerate(result) if c.commit_id == target.commit_id)
210 idx_fixup = next(i for i, c in enumerate(result) if c.commit_id == fixup.commit_id)
211 assert idx_fixup == idx_target + 1
212
213
214 def test_rebase_autosquash_no_fixups() -> None:
215 """apply_autosquash returns original list unchanged when no fixup! commits."""
216 committed_at = datetime.datetime.now(datetime.timezone.utc)
217
218 def _c(msg: str) -> MuseCliCommit:
219 return MuseCliCommit(
220 commit_id=str(uuid.uuid4()),
221 repo_id="repo",
222 branch="main",
223 parent_commit_id=None,
224 snapshot_id="snap",
225 message=msg,
226 author="",
227 committed_at=committed_at,
228 )
229
230 commits = [_c("Add drums"), _c("Add bass")]
231 result, was_reordered = apply_autosquash(commits)
232 assert was_reordered is False
233 assert [c.commit_id for c in result] == [c.commit_id for c in commits]
234
235
236 # ---------------------------------------------------------------------------
237 # InteractivePlan tests
238 # ---------------------------------------------------------------------------
239
240
241 def test_rebase_interactive_plan_from_commits() -> None:
242 """InteractivePlan.from_commits produces a pick entry per commit."""
243 committed_at = datetime.datetime.now(datetime.timezone.utc)
244 commits = [
245 MuseCliCommit(
246 commit_id="abc" + "0" * 61,
247 repo_id="r",
248 branch="main",
249 parent_commit_id=None,
250 snapshot_id="snap",
251 message="First commit",
252 author="",
253 committed_at=committed_at,
254 ),
255 MuseCliCommit(
256 commit_id="def" + "0" * 61,
257 repo_id="r",
258 branch="main",
259 parent_commit_id=None,
260 snapshot_id="snap2",
261 message="Second commit",
262 author="",
263 committed_at=committed_at,
264 ),
265 ]
266 plan = InteractivePlan.from_commits(commits)
267 assert len(plan.entries) == 2
268 assert plan.entries[0][0] == "pick"
269 assert plan.entries[1][0] == "pick"
270
271
272 def test_rebase_interactive_plan_parse_drop() -> None:
273 """drop entries are excluded from resolve_against output."""
274 committed_at = datetime.datetime.now(datetime.timezone.utc)
275 commit = MuseCliCommit(
276 commit_id="abc" + "0" * 61,
277 repo_id="r",
278 branch="main",
279 parent_commit_id=None,
280 snapshot_id="snap",
281 message="A commit",
282 author="",
283 committed_at=committed_at,
284 )
285 plan_text = "drop abc A commit\n"
286 plan = InteractivePlan.from_text(plan_text)
287 resolved = plan.resolve_against([commit])
288 assert resolved == []
289
290
291 def test_rebase_interactive_plan_invalid_action() -> None:
292 """Unrecognised action raises ValueError."""
293 with pytest.raises(ValueError, match="Unknown action"):
294 InteractivePlan.from_text("yolo abc Some commit\n")
295
296
297 def test_rebase_interactive_plan_ambiguous_sha() -> None:
298 """Ambiguous SHA prefix raises ValueError."""
299 committed_at = datetime.datetime.now(datetime.timezone.utc)
300 commits = [
301 MuseCliCommit(
302 commit_id="abc" + str(i) + "0" * 60,
303 repo_id="r",
304 branch="main",
305 parent_commit_id=None,
306 snapshot_id="snap",
307 message="msg",
308 author="",
309 committed_at=committed_at,
310 )
311 for i in range(2)
312 ]
313 plan = InteractivePlan.from_text("pick abc msg\n")
314 with pytest.raises(ValueError, match="ambiguous"):
315 plan.resolve_against(commits)
316
317
318 # ---------------------------------------------------------------------------
319 # RebaseState roundtrip
320 # ---------------------------------------------------------------------------
321
322
323 def test_rebase_state_roundtrip(tmp_path: pathlib.Path) -> None:
324 """write_rebase_state / read_rebase_state is a lossless roundtrip."""
325 muse_dir = tmp_path / ".muse"
326 muse_dir.mkdir()
327
328 state = RebaseState(
329 upstream_commit="upstream123",
330 base_commit="base456",
331 original_branch="feature",
332 original_head="head789",
333 commits_to_replay=["cid1", "cid2"],
334 current_onto="onto000",
335 completed_pairs=[["orig1", "new1"]],
336 current_commit="cid1",
337 conflict_paths=["beat.mid"],
338 )
339 write_rebase_state(tmp_path, state)
340
341 loaded = read_rebase_state(tmp_path)
342 assert loaded is not None
343 assert loaded.upstream_commit == state.upstream_commit
344 assert loaded.base_commit == state.base_commit
345 assert loaded.original_branch == state.original_branch
346 assert loaded.original_head == state.original_head
347 assert loaded.commits_to_replay == state.commits_to_replay
348 assert loaded.current_onto == state.current_onto
349 assert loaded.completed_pairs == state.completed_pairs
350 assert loaded.current_commit == state.current_commit
351 assert loaded.conflict_paths == state.conflict_paths
352
353 clear_rebase_state(tmp_path)
354 assert read_rebase_state(tmp_path) is None
355
356
357 def test_rebase_state_missing_file(tmp_path: pathlib.Path) -> None:
358 """read_rebase_state returns None when REBASE_STATE.json does not exist."""
359 muse_dir = tmp_path / ".muse"
360 muse_dir.mkdir()
361 assert read_rebase_state(tmp_path) is None
362
363
364 # ---------------------------------------------------------------------------
365 # Async: collect commits since base
366 # ---------------------------------------------------------------------------
367
368
369 @pytest.mark.anyio
370 async def test_rebase_collect_commits_since_base(
371 async_session: AsyncSession,
372 repo_id: str,
373 ) -> None:
374 """_collect_branch_commits_since_base returns only commits beyond the LCA."""
375 base_commit = await _seed_commit(
376 async_session, repo_id, "main", "Base", {"a.mid": "a1"}
377 )
378 c1 = await _seed_commit(
379 async_session, repo_id, "main", "C1", {"a.mid": "a2"}, base_commit.commit_id
380 )
381 c2 = await _seed_commit(
382 async_session, repo_id, "main", "C2", {"a.mid": "a3"}, c1.commit_id
383 )
384
385 commits = await _collect_branch_commits_since_base(
386 async_session, c2.commit_id, base_commit.commit_id
387 )
388 commit_ids = [c.commit_id for c in commits]
389 assert base_commit.commit_id not in commit_ids
390 assert c1.commit_id in commit_ids
391 assert c2.commit_id in commit_ids
392 # Oldest first
393 assert commit_ids.index(c1.commit_id) < commit_ids.index(c2.commit_id)
394
395
396 # ---------------------------------------------------------------------------
397 # Async: full rebase pipeline
398 # ---------------------------------------------------------------------------
399
400
401 @pytest.mark.anyio
402 async def test_rebase_linear_replays_commits(
403 async_session: AsyncSession,
404 repo_id: str,
405 repo_root: pathlib.Path,
406 ) -> None:
407 """Regression: linear rebase replays commits onto the upstream tip.
408
409 Topology:
410 base → upstream (on 'dev')
411 base → c1 → c2 (on 'main')
412
413 After rebase main onto dev:
414 dev → c1' → c2' (main)
415 """
416 muse_dir = repo_root / ".muse"
417
418 # Seed base commit (common ancestor)
419 base = await _seed_commit(
420 async_session, repo_id, "main", "Base", {"common.mid": "v0"}
421 )
422
423 # Upstream (dev) advances from base
424 upstream = await _seed_commit(
425 async_session, repo_id, "dev", "Dev work", {"common.mid": "v0", "dev.mid": "d1"},
426 base.commit_id,
427 )
428
429 # Current branch (main) has two commits since base
430 c1 = await _seed_commit(
431 async_session, repo_id, "main", "Add piano",
432 {"common.mid": "v0", "piano.mid": "p1"},
433 base.commit_id,
434 )
435 c2 = await _seed_commit(
436 async_session, repo_id, "main", "Add strings",
437 {"common.mid": "v0", "piano.mid": "p1", "strings.mid": "s1"},
438 c1.commit_id,
439 )
440
441 # Set up repo HEAD on main
442 (muse_dir / "HEAD").write_text("refs/heads/main")
443 (muse_dir / "refs" / "heads" / "main").write_text(c2.commit_id)
444 (muse_dir / "refs" / "heads" / "dev").write_text(upstream.commit_id)
445
446 result = await _rebase_async(
447 upstream="dev",
448 root=repo_root,
449 session=async_session,
450 )
451
452 assert isinstance(result, RebaseResult)
453 assert result.noop is False
454 assert result.aborted is False
455 assert len(result.replayed) == 2
456
457 # Original commits should be mapped to new commit IDs
458 original_ids = {p.original_commit_id for p in result.replayed}
459 assert c1.commit_id in original_ids
460 assert c2.commit_id in original_ids
461
462 # Branch pointer should be updated to the last replayed commit
463 new_head = (muse_dir / "refs" / "heads" / "main").read_text().strip()
464 new_commit_ids = {p.new_commit_id for p in result.replayed}
465 assert new_head in new_commit_ids
466
467 # The new HEAD should have a different commit_id (rebased)
468 assert new_head != c2.commit_id
469
470
471 @pytest.mark.anyio
472 async def test_rebase_noop_already_up_to_date(
473 async_session: AsyncSession,
474 repo_id: str,
475 repo_root: pathlib.Path,
476 ) -> None:
477 """Rebase is a no-op when HEAD equals the upstream tip."""
478 muse_dir = repo_root / ".muse"
479
480 commit = await _seed_commit(
481 async_session, repo_id, "main", "Initial", {"a.mid": "v1"}
482 )
483 (muse_dir / "HEAD").write_text("refs/heads/main")
484 (muse_dir / "refs" / "heads" / "main").write_text(commit.commit_id)
485 (muse_dir / "refs" / "heads" / "dev").write_text(commit.commit_id)
486
487 result = await _rebase_async(
488 upstream="dev",
489 root=repo_root,
490 session=async_session,
491 )
492
493 assert result.noop is True
494 assert result.replayed == ()
495
496
497 @pytest.mark.anyio
498 async def test_rebase_fast_forward_advances_pointer(
499 async_session: AsyncSession,
500 repo_id: str,
501 repo_root: pathlib.Path,
502 ) -> None:
503 """When current branch is behind upstream, fast-forward the pointer."""
504 muse_dir = repo_root / ".muse"
505
506 base = await _seed_commit(
507 async_session, repo_id, "main", "Base", {"a.mid": "v0"}
508 )
509 upstream = await _seed_commit(
510 async_session, repo_id, "dev", "Dev ahead", {"a.mid": "v1"}, base.commit_id
511 )
512
513 # main is at base (behind dev)
514 (muse_dir / "HEAD").write_text("refs/heads/main")
515 (muse_dir / "refs" / "heads" / "main").write_text(base.commit_id)
516 (muse_dir / "refs" / "heads" / "dev").write_text(upstream.commit_id)
517
518 result = await _rebase_async(
519 upstream="dev",
520 root=repo_root,
521 session=async_session,
522 )
523
524 assert result.noop is True
525 assert result.replayed == ()
526 new_head = (muse_dir / "refs" / "heads" / "main").read_text().strip()
527 assert new_head == upstream.commit_id
528
529
530 @pytest.mark.anyio
531 async def test_rebase_already_ahead_noop(
532 async_session: AsyncSession,
533 repo_id: str,
534 repo_root: pathlib.Path,
535 ) -> None:
536 """When upstream is the merge base (current branch ahead), result is noop."""
537 muse_dir = repo_root / ".muse"
538
539 upstream = await _seed_commit(
540 async_session, repo_id, "dev", "Dev base", {"a.mid": "v0"}
541 )
542 current = await _seed_commit(
543 async_session, repo_id, "main", "Ahead", {"a.mid": "v1"}, upstream.commit_id
544 )
545
546 (muse_dir / "HEAD").write_text("refs/heads/main")
547 (muse_dir / "refs" / "heads" / "main").write_text(current.commit_id)
548 (muse_dir / "refs" / "heads" / "dev").write_text(upstream.commit_id)
549
550 result = await _rebase_async(
551 upstream="dev",
552 root=repo_root,
553 session=async_session,
554 )
555
556 assert result.noop is True
557
558
559 # ---------------------------------------------------------------------------
560 # Async: abort
561 # ---------------------------------------------------------------------------
562
563
564 @pytest.mark.anyio
565 async def test_rebase_abort_restores_branch(repo_root: pathlib.Path) -> None:
566 """--abort restores the branch pointer to original_head."""
567 muse_dir = repo_root / ".muse"
568 (muse_dir / "refs" / "heads").mkdir(parents=True, exist_ok=True)
569 original_head = "deadbeef" + "0" * 56
570 (muse_dir / "refs" / "heads" / "main").write_text("newhead" + "0" * 57)
571
572 state = RebaseState(
573 upstream_commit="upstream" + "0" * 56,
574 base_commit="base" + "0" * 60,
575 original_branch="main",
576 original_head=original_head,
577 commits_to_replay=["rem1"],
578 current_onto="onto" + "0" * 60,
579 completed_pairs=[],
580 current_commit="cur" + "0" * 61,
581 conflict_paths=["beat.mid"],
582 )
583 write_rebase_state(repo_root, state)
584
585 result = await _rebase_abort_async(root=repo_root)
586
587 assert result.aborted is True
588 restored = (muse_dir / "refs" / "heads" / "main").read_text().strip()
589 assert restored == original_head
590 assert read_rebase_state(repo_root) is None
591
592
593 @pytest.mark.anyio
594 async def test_rebase_abort_no_state_errors(repo_root: pathlib.Path) -> None:
595 """--abort when no REBASE_STATE.json exits with USER_ERROR."""
596 with pytest.raises(typer.Exit) as exc_info:
597 await _rebase_abort_async(root=repo_root)
598 assert exc_info.value.exit_code == 1
599
600
601 # ---------------------------------------------------------------------------
602 # Async: continue
603 # ---------------------------------------------------------------------------
604
605
606 @pytest.mark.anyio
607 async def test_rebase_continue_no_state_errors(
608 async_session: AsyncSession,
609 repo_root: pathlib.Path,
610 ) -> None:
611 """--continue when no REBASE_STATE.json exits with USER_ERROR."""
612 with pytest.raises(typer.Exit) as exc_info:
613 await _rebase_continue_async(root=repo_root, session=async_session)
614 assert exc_info.value.exit_code == 1
615
616
617 @pytest.mark.anyio
618 async def test_rebase_continue_replays_remaining(
619 async_session: AsyncSession,
620 repo_id: str,
621 repo_root: pathlib.Path,
622 ) -> None:
623 """--continue replays remaining commits and advances the branch pointer."""
624 muse_dir = repo_root / ".muse"
625
626 # Seed a commit to represent the "onto" tip (already completed part)
627 onto = await _seed_commit(
628 async_session, repo_id, "main", "Onto tip", {"a.mid": "v1"}
629 )
630
631 # Seed a commit to replay
632 remaining = await _seed_commit(
633 async_session, repo_id, "main", "Remaining", {"a.mid": "v1", "b.mid": "b1"},
634 onto.commit_id,
635 )
636
637 (muse_dir / "refs" / "heads").mkdir(parents=True, exist_ok=True)
638 (muse_dir / "refs" / "heads" / "main").write_text(onto.commit_id)
639
640 state = RebaseState(
641 upstream_commit=onto.commit_id,
642 base_commit="base" + "0" * 60,
643 original_branch="main",
644 original_head="original" + "0" * 56,
645 commits_to_replay=[remaining.commit_id],
646 current_onto=onto.commit_id,
647 completed_pairs=[],
648 current_commit="",
649 conflict_paths=[],
650 )
651 write_rebase_state(repo_root, state)
652
653 result = await _rebase_continue_async(root=repo_root, session=async_session)
654
655 assert result.aborted is False
656 assert len(result.replayed) == 1
657 new_head = (muse_dir / "refs" / "heads" / "main").read_text().strip()
658 assert new_head == result.replayed[0].new_commit_id
659 assert new_head != remaining.commit_id
660 assert read_rebase_state(repo_root) is None
661
662
663 # ---------------------------------------------------------------------------
664 # Boundary seal — AST import check
665 # ---------------------------------------------------------------------------
666
667
668 def test_boundary_no_forbidden_imports() -> None:
669 """muse_rebase must not import StateStore, EntityRegistry, or executor modules."""
670 import importlib.util
671
672 spec = importlib.util.spec_from_file_location(
673 "muse_rebase",
674 pathlib.Path(__file__).parent.parent
675 / "maestro"
676 / "services"
677 / "muse_rebase.py",
678 )
679 assert spec is not None
680 source_path = spec.origin
681 assert source_path is not None
682
683 tree = ast.parse(pathlib.Path(source_path).read_text())
684 forbidden = {"StateStore", "EntityRegistry", "get_or_create_store"}
685
686 for node in ast.walk(tree):
687 if isinstance(node, ast.Import):
688 for alias in node.names:
689 assert alias.name not in forbidden, (
690 f"Forbidden import {alias.name!r} in muse_rebase.py"
691 )
692 elif isinstance(node, ast.ImportFrom):
693 module = node.module or ""
694 assert not any(f in module for f in {"executor", "maestro_handlers"}), (
695 f"Forbidden module import {module!r} in muse_rebase.py"
696 )
697 for alias in node.names:
698 assert alias.name not in forbidden, (
699 f"Forbidden import {alias.name!r} from {module!r} in muse_rebase.py"
700 )