cgcardona / muse public
test_core_coverage_gaps.py python
276 lines 10.4 KB
c8984819 test: bring core VCS coverage from 60% to 91% Gabriel Cardona <gabriel@tellurstori.com> 3d ago
1 """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine."""
2 from __future__ import annotations
3
4 import json
5 import os
6 import pathlib
7
8 import pytest
9
10 from muse.core.object_store import (
11 has_object,
12 object_path,
13 objects_dir,
14 read_object,
15 restore_object,
16 write_object,
17 write_object_from_path,
18 )
19 from muse.core.repo import find_repo_root, require_repo
20 from muse.core.store import (
21 CommitRecord,
22 SnapshotRecord,
23 get_commits_for_branch,
24 get_head_commit_id,
25 get_head_snapshot_id,
26 get_head_snapshot_manifest,
27 get_tags_for_commit,
28 read_commit,
29 read_snapshot,
30 resolve_commit_ref,
31 update_commit_metadata,
32 write_commit,
33 write_snapshot,
34 )
35 from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state
36
37 import datetime
38
39
40 # ---------------------------------------------------------------------------
41 # object_store
42 # ---------------------------------------------------------------------------
43
44
45 class TestObjectStore:
46 def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None:
47 d = objects_dir(tmp_path)
48 assert d == tmp_path / ".muse" / "objects"
49
50 def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None:
51 oid = "ab" + "c" * 62
52 p = object_path(tmp_path, oid)
53 assert p.parent.name == "ab"
54 assert p.name == "c" * 62
55
56 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None:
57 assert not has_object(tmp_path, "a" * 64)
58
59 def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None:
60 oid = "a" * 64
61 write_object(tmp_path, oid, b"hello")
62 assert has_object(tmp_path, oid)
63
64 def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None:
65 oid = "b" * 64
66 assert write_object(tmp_path, oid, b"first") is True
67 assert write_object(tmp_path, oid, b"second") is False
68 # content should not change
69 assert read_object(tmp_path, oid) == b"first"
70
71 def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None:
72 src = tmp_path / "src.bin"
73 src.write_bytes(b"content")
74 oid = "c" * 64
75 assert write_object_from_path(tmp_path, oid, src) is True
76 assert write_object_from_path(tmp_path, oid, src) is False
77
78 def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None:
79 src = tmp_path / "file.bin"
80 src.write_bytes(b"my bytes")
81 oid = "d" * 64
82 write_object_from_path(tmp_path, oid, src)
83 assert read_object(tmp_path, oid) == b"my bytes"
84
85 def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None:
86 assert read_object(tmp_path, "e" * 64) is None
87
88 def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None:
89 oid = "f" * 64
90 write_object(tmp_path, oid, b"data")
91 assert read_object(tmp_path, oid) == b"data"
92
93 def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None:
94 dest = tmp_path / "out.bin"
95 result = restore_object(tmp_path, "0" * 64, dest)
96 assert result is False
97 assert not dest.exists()
98
99 def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None:
100 oid = "1" * 64
101 write_object(tmp_path, oid, b"restored")
102 dest = tmp_path / "sub" / "out.bin"
103 result = restore_object(tmp_path, oid, dest)
104 assert result is True
105 assert dest.read_bytes() == b"restored"
106
107 def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
108 oid = "2" * 64
109 write_object(tmp_path, oid, b"nested")
110 dest = tmp_path / "a" / "b" / "c" / "file.bin"
111 restore_object(tmp_path, oid, dest)
112 assert dest.exists()
113
114
115 # ---------------------------------------------------------------------------
116 # repo
117 # ---------------------------------------------------------------------------
118
119
120 class TestFindRepoRoot:
121 def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None:
122 (tmp_path / ".muse").mkdir()
123 result = find_repo_root(tmp_path)
124 assert result == tmp_path
125
126 def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None:
127 (tmp_path / ".muse").mkdir()
128 subdir = tmp_path / "a" / "b"
129 subdir.mkdir(parents=True)
130 result = find_repo_root(subdir)
131 assert result == tmp_path
132
133 def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None:
134 result = find_repo_root(tmp_path)
135 assert result is None
136
137 def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
138 (tmp_path / ".muse").mkdir()
139 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
140 result = find_repo_root()
141 assert result == tmp_path
142
143 def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
144 # tmp_path exists but has no .muse/
145 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
146 result = find_repo_root()
147 assert result is None
148
149 def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
150 import click
151 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
152 monkeypatch.chdir(tmp_path)
153 with pytest.raises(click.exceptions.Exit):
154 require_repo()
155
156
157 # ---------------------------------------------------------------------------
158 # store coverage gaps
159 # ---------------------------------------------------------------------------
160
161
162 class TestStoreGaps:
163 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
164 muse = tmp_path / ".muse"
165 for d in ("commits", "snapshots", "objects", "refs/heads"):
166 (muse / d).mkdir(parents=True)
167 (muse / "HEAD").write_text("refs/heads/main\n")
168 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
169 (muse / "refs" / "heads" / "main").write_text("")
170 return tmp_path
171
172 def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None:
173 root = self._make_repo(tmp_path)
174 assert get_head_commit_id(root, "main") is None
175
176 def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None:
177 root = self._make_repo(tmp_path)
178 assert get_head_snapshot_id(root, "test-repo", "main") is None
179
180 def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None:
181 root = self._make_repo(tmp_path)
182 assert get_head_snapshot_manifest(root, "test-repo", "main") is None
183
184 def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None:
185 root = self._make_repo(tmp_path)
186 commits = get_commits_for_branch(root, "test-repo", "main")
187 assert commits == []
188
189 def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None:
190 root = self._make_repo(tmp_path)
191 snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64})
192 write_snapshot(root, snap)
193 committed_at = datetime.datetime.now(datetime.timezone.utc)
194 commit = CommitRecord(
195 commit_id="c" * 64,
196 repo_id="test-repo",
197 branch="main",
198 snapshot_id="s" * 64,
199 message="test",
200 committed_at=committed_at,
201 )
202 write_commit(root, commit)
203 (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64)
204
205 result = resolve_commit_ref(root, "test-repo", "main", None)
206 assert result is not None
207 assert result.commit_id == "c" * 64
208
209 def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
210 root = self._make_repo(tmp_path)
211 assert read_commit(root, "unknown") is None
212
213 def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
214 root = self._make_repo(tmp_path)
215 assert read_snapshot(root, "unknown") is None
216
217 def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None:
218 root = self._make_repo(tmp_path)
219 assert update_commit_metadata(root, "unknown", "key", "val") is False
220
221 def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None:
222 root = self._make_repo(tmp_path)
223 tags = get_tags_for_commit(root, "test-repo", "c" * 64)
224 assert tags == []
225
226
227 # ---------------------------------------------------------------------------
228 # merge_engine coverage gaps
229 # ---------------------------------------------------------------------------
230
231
232 class TestMergeEngineCoverageGaps:
233 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
234 muse = tmp_path / ".muse"
235 muse.mkdir(parents=True)
236 return tmp_path
237
238 def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None:
239 root = self._make_repo(tmp_path)
240 # Should not raise even if MERGE_STATE.json is absent
241 clear_merge_state(root)
242
243 def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None:
244 root = self._make_repo(tmp_path)
245 # Write a real object to the store
246 oid = "a" * 64
247 write_object(root, oid, b"resolved content")
248
249 apply_resolution(root, "track.mid", oid)
250 dest = root / "muse-work" / "track.mid"
251 assert dest.exists()
252 assert dest.read_bytes() == b"resolved content"
253
254 def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None:
255 root = self._make_repo(tmp_path)
256 with pytest.raises(FileNotFoundError):
257 apply_resolution(root, "track.mid", "0" * 64)
258
259 def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None:
260 root = self._make_repo(tmp_path)
261 (root / ".muse" / "MERGE_STATE.json").write_text("not json {{")
262 result = read_merge_state(root)
263 assert result is None
264
265 def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None:
266 root = self._make_repo(tmp_path)
267 write_merge_state(
268 root,
269 base_commit="b" * 64,
270 ours_commit="o" * 64,
271 theirs_commit="t" * 64,
272 conflict_paths=["a.mid"],
273 )
274 assert (root / ".muse" / "MERGE_STATE.json").exists()
275 clear_merge_state(root)
276 assert not (root / ".muse" / "MERGE_STATE.json").exists()