test_core_merge_engine.py
python
| 1 | """Tests for muse.core.merge_engine — three-way merge logic.""" |
| 2 | from __future__ import annotations |
| 3 | |
| 4 | import json |
| 5 | import pathlib |
| 6 | |
| 7 | import pytest |
| 8 | |
| 9 | from muse.core.merge_engine import ( |
| 10 | MergeState, |
| 11 | apply_merge, |
| 12 | clear_merge_state, |
| 13 | detect_conflicts, |
| 14 | diff_snapshots, |
| 15 | find_merge_base, |
| 16 | read_merge_state, |
| 17 | write_merge_state, |
| 18 | ) |
| 19 | from muse.core.store import CommitRecord, write_commit |
| 20 | import datetime |
| 21 | |
| 22 | |
| 23 | @pytest.fixture |
| 24 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 25 | muse_dir = tmp_path / ".muse" |
| 26 | (muse_dir / "commits").mkdir(parents=True) |
| 27 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 28 | return tmp_path |
| 29 | |
| 30 | |
| 31 | def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> None: |
| 32 | write_commit(root, CommitRecord( |
| 33 | commit_id=cid, |
| 34 | repo_id="r", |
| 35 | branch="main", |
| 36 | snapshot_id=f"snap-{cid}", |
| 37 | message=cid, |
| 38 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 39 | parent_commit_id=parent, |
| 40 | parent2_commit_id=parent2, |
| 41 | )) |
| 42 | |
| 43 | |
| 44 | class TestDiffSnapshots: |
| 45 | def test_no_change(self) -> None: |
| 46 | m = {"a.mid": "h1", "b.mid": "h2"} |
| 47 | assert diff_snapshots(m, m) == set() |
| 48 | |
| 49 | def test_added(self) -> None: |
| 50 | assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"} |
| 51 | |
| 52 | def test_removed(self) -> None: |
| 53 | assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"} |
| 54 | |
| 55 | def test_modified(self) -> None: |
| 56 | assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"} |
| 57 | |
| 58 | |
| 59 | class TestDetectConflicts: |
| 60 | def test_no_conflict(self) -> None: |
| 61 | assert detect_conflicts({"a.mid"}, {"b.mid"}) == set() |
| 62 | |
| 63 | def test_conflict(self) -> None: |
| 64 | assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}) == {"b.mid"} |
| 65 | |
| 66 | def test_both_empty(self) -> None: |
| 67 | assert detect_conflicts(set(), set()) == set() |
| 68 | |
| 69 | |
| 70 | class TestApplyMerge: |
| 71 | def test_clean_merge(self) -> None: |
| 72 | base = {"a.mid": "h0", "b.mid": "h0"} |
| 73 | ours = {"a.mid": "h_ours", "b.mid": "h0"} |
| 74 | theirs = {"a.mid": "h0", "b.mid": "h_theirs"} |
| 75 | ours_changed = {"a.mid"} |
| 76 | theirs_changed = {"b.mid"} |
| 77 | result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set()) |
| 78 | assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"} |
| 79 | |
| 80 | def test_conflict_paths_excluded(self) -> None: |
| 81 | base = {"a.mid": "h0"} |
| 82 | ours = {"a.mid": "h_ours"} |
| 83 | theirs = {"a.mid": "h_theirs"} |
| 84 | ours_changed = theirs_changed = {"a.mid"} |
| 85 | result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"}) |
| 86 | assert result == {"a.mid": "h0"} # Falls back to base |
| 87 | |
| 88 | def test_ours_deletion_applied(self) -> None: |
| 89 | base = {"a.mid": "h0", "b.mid": "h0"} |
| 90 | ours = {"b.mid": "h0"} # a.mid deleted on ours |
| 91 | theirs = {"a.mid": "h0", "b.mid": "h0"} |
| 92 | result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set()) |
| 93 | assert "a.mid" not in result |
| 94 | |
| 95 | |
| 96 | class TestMergeStateIO: |
| 97 | def test_write_and_read(self, repo: pathlib.Path) -> None: |
| 98 | write_merge_state( |
| 99 | repo, |
| 100 | base_commit="base", |
| 101 | ours_commit="ours", |
| 102 | theirs_commit="theirs", |
| 103 | conflict_paths=["a.mid", "b.mid"], |
| 104 | other_branch="feature/x", |
| 105 | ) |
| 106 | state = read_merge_state(repo) |
| 107 | assert state is not None |
| 108 | assert state.base_commit == "base" |
| 109 | assert state.conflict_paths == ["a.mid", "b.mid"] |
| 110 | assert state.other_branch == "feature/x" |
| 111 | |
| 112 | def test_read_no_state(self, repo: pathlib.Path) -> None: |
| 113 | assert read_merge_state(repo) is None |
| 114 | |
| 115 | def test_clear(self, repo: pathlib.Path) -> None: |
| 116 | write_merge_state(repo, base_commit="b", ours_commit="o", theirs_commit="t", conflict_paths=[]) |
| 117 | clear_merge_state(repo) |
| 118 | assert read_merge_state(repo) is None |
| 119 | |
| 120 | |
| 121 | class TestFindMergeBase: |
| 122 | def test_direct_parent(self, repo: pathlib.Path) -> None: |
| 123 | _commit(repo, "root") |
| 124 | _commit(repo, "a", parent="root") |
| 125 | _commit(repo, "b", parent="root") |
| 126 | base = find_merge_base(repo, "a", "b") |
| 127 | assert base == "root" |
| 128 | |
| 129 | def test_same_commit(self, repo: pathlib.Path) -> None: |
| 130 | _commit(repo, "root") |
| 131 | base = find_merge_base(repo, "root", "root") |
| 132 | assert base == "root" |
| 133 | |
| 134 | def test_linear_history(self, repo: pathlib.Path) -> None: |
| 135 | _commit(repo, "a") |
| 136 | _commit(repo, "b", parent="a") |
| 137 | _commit(repo, "c", parent="b") |
| 138 | base = find_merge_base(repo, "c", "b") |
| 139 | assert base == "b" |
| 140 | |
| 141 | def test_no_common_ancestor(self, repo: pathlib.Path) -> None: |
| 142 | _commit(repo, "x") |
| 143 | _commit(repo, "y") |
| 144 | assert find_merge_base(repo, "x", "y") is None |