cgcardona / muse public
test_core_coverage_gaps.py python
289 lines 10.8 KB
dfaf1b77 refactor: rename muse-work/ → state/ Gabriel Cardona <gabriel@tellurstori.com> 8h ago
1 """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine."""
2
3 import hashlib
4 import json
5 import os
6 import pathlib
7
8 import pytest
9
10
11 def _sha256(data: bytes) -> str:
12 return hashlib.sha256(data).hexdigest()
13
14 from muse.core.object_store import (
15 has_object,
16 object_path,
17 objects_dir,
18 read_object,
19 restore_object,
20 write_object,
21 write_object_from_path,
22 )
23 from muse.core.repo import find_repo_root, require_repo
24 from muse.core.store import (
25 CommitRecord,
26 SnapshotRecord,
27 get_commits_for_branch,
28 get_head_commit_id,
29 get_head_snapshot_id,
30 get_head_snapshot_manifest,
31 get_tags_for_commit,
32 read_commit,
33 read_snapshot,
34 resolve_commit_ref,
35 update_commit_metadata,
36 write_commit,
37 write_snapshot,
38 )
39 from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state
40
41 import datetime
42
43
44 # ---------------------------------------------------------------------------
45 # object_store
46 # ---------------------------------------------------------------------------
47
48
49 class TestObjectStore:
50 def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None:
51 d = objects_dir(tmp_path)
52 assert d == tmp_path / ".muse" / "objects"
53
54 def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None:
55 oid = "ab" + "c" * 62
56 p = object_path(tmp_path, oid)
57 assert p.parent.name == "ab"
58 assert p.name == "c" * 62
59
60 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None:
61 assert not has_object(tmp_path, "a" * 64)
62
63 def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None:
64 content = b"hello"
65 oid = _sha256(content)
66 write_object(tmp_path, oid, content)
67 assert has_object(tmp_path, oid)
68
69 def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None:
70 content = b"first"
71 oid = _sha256(content)
72 assert write_object(tmp_path, oid, content) is True
73 # Second write with correct hash but same ID — idempotent
74 assert write_object(tmp_path, oid, content) is False
75 # content should not change
76 assert read_object(tmp_path, oid) == content
77
78 def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None:
79 content = b"content"
80 src = tmp_path / "src.bin"
81 src.write_bytes(content)
82 oid = _sha256(content)
83 assert write_object_from_path(tmp_path, oid, src) is True
84 assert write_object_from_path(tmp_path, oid, src) is False
85
86 def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None:
87 content = b"my bytes"
88 src = tmp_path / "file.bin"
89 src.write_bytes(content)
90 oid = _sha256(content)
91 write_object_from_path(tmp_path, oid, src)
92 assert read_object(tmp_path, oid) == content
93
94 def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None:
95 assert read_object(tmp_path, "e" * 64) is None
96
97 def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None:
98 content = b"data"
99 oid = _sha256(content)
100 write_object(tmp_path, oid, content)
101 assert read_object(tmp_path, oid) == content
102
103 def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None:
104 dest = tmp_path / "out.bin"
105 result = restore_object(tmp_path, "0" * 64, dest)
106 assert result is False
107 assert not dest.exists()
108
109 def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None:
110 content = b"restored"
111 oid = _sha256(content)
112 write_object(tmp_path, oid, content)
113 dest = tmp_path / "sub" / "out.bin"
114 result = restore_object(tmp_path, oid, dest)
115 assert result is True
116 assert dest.read_bytes() == content
117
118 def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
119 content = b"nested"
120 oid = _sha256(content)
121 write_object(tmp_path, oid, content)
122 dest = tmp_path / "a" / "b" / "c" / "file.bin"
123 restore_object(tmp_path, oid, dest)
124 assert dest.exists()
125
126
127 # ---------------------------------------------------------------------------
128 # repo
129 # ---------------------------------------------------------------------------
130
131
132 class TestFindRepoRoot:
133 def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None:
134 (tmp_path / ".muse").mkdir()
135 result = find_repo_root(tmp_path)
136 assert result == tmp_path
137
138 def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None:
139 (tmp_path / ".muse").mkdir()
140 subdir = tmp_path / "a" / "b"
141 subdir.mkdir(parents=True)
142 result = find_repo_root(subdir)
143 assert result == tmp_path
144
145 def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None:
146 result = find_repo_root(tmp_path)
147 assert result is None
148
149 def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
150 (tmp_path / ".muse").mkdir()
151 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
152 result = find_repo_root()
153 assert result == tmp_path
154
155 def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
156 # tmp_path exists but has no .muse/
157 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
158 result = find_repo_root()
159 assert result is None
160
161 def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
162 import click
163 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
164 monkeypatch.chdir(tmp_path)
165 with pytest.raises(click.exceptions.Exit):
166 require_repo()
167
168
169 # ---------------------------------------------------------------------------
170 # store coverage gaps
171 # ---------------------------------------------------------------------------
172
173
174 class TestStoreGaps:
175 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
176 muse = tmp_path / ".muse"
177 for d in ("commits", "snapshots", "objects", "refs/heads"):
178 (muse / d).mkdir(parents=True)
179 (muse / "HEAD").write_text("refs/heads/main\n")
180 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
181 (muse / "refs" / "heads" / "main").write_text("")
182 return tmp_path
183
184 def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None:
185 root = self._make_repo(tmp_path)
186 assert get_head_commit_id(root, "main") is None
187
188 def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None:
189 root = self._make_repo(tmp_path)
190 assert get_head_snapshot_id(root, "test-repo", "main") is None
191
192 def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None:
193 root = self._make_repo(tmp_path)
194 assert get_head_snapshot_manifest(root, "test-repo", "main") is None
195
196 def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None:
197 root = self._make_repo(tmp_path)
198 commits = get_commits_for_branch(root, "test-repo", "main")
199 assert commits == []
200
201 def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None:
202 root = self._make_repo(tmp_path)
203 snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64})
204 write_snapshot(root, snap)
205 committed_at = datetime.datetime.now(datetime.timezone.utc)
206 commit = CommitRecord(
207 commit_id="c" * 64,
208 repo_id="test-repo",
209 branch="main",
210 snapshot_id="s" * 64,
211 message="test",
212 committed_at=committed_at,
213 )
214 write_commit(root, commit)
215 (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64)
216
217 result = resolve_commit_ref(root, "test-repo", "main", None)
218 assert result is not None
219 assert result.commit_id == "c" * 64
220
221 def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
222 root = self._make_repo(tmp_path)
223 assert read_commit(root, "unknown") is None
224
225 def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
226 root = self._make_repo(tmp_path)
227 assert read_snapshot(root, "unknown") is None
228
229 def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None:
230 root = self._make_repo(tmp_path)
231 assert update_commit_metadata(root, "unknown", "key", "val") is False
232
233 def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None:
234 root = self._make_repo(tmp_path)
235 tags = get_tags_for_commit(root, "test-repo", "c" * 64)
236 assert tags == []
237
238
239 # ---------------------------------------------------------------------------
240 # merge_engine coverage gaps
241 # ---------------------------------------------------------------------------
242
243
244 class TestMergeEngineCoverageGaps:
245 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
246 muse = tmp_path / ".muse"
247 muse.mkdir(parents=True)
248 return tmp_path
249
250 def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None:
251 root = self._make_repo(tmp_path)
252 # Should not raise even if MERGE_STATE.json is absent
253 clear_merge_state(root)
254
255 def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None:
256 root = self._make_repo(tmp_path)
257 # Write a real object to the store — oid must be the SHA-256 of the content.
258 content = b"resolved content"
259 oid = _sha256(content)
260 write_object(root, oid, content)
261
262 apply_resolution(root, "track.mid", oid)
263 dest = root / "state" / "track.mid"
264 assert dest.exists()
265 assert dest.read_bytes() == b"resolved content"
266
267 def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None:
268 root = self._make_repo(tmp_path)
269 with pytest.raises(FileNotFoundError):
270 apply_resolution(root, "track.mid", "0" * 64)
271
272 def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None:
273 root = self._make_repo(tmp_path)
274 (root / ".muse" / "MERGE_STATE.json").write_text("not json {{")
275 result = read_merge_state(root)
276 assert result is None
277
278 def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None:
279 root = self._make_repo(tmp_path)
280 write_merge_state(
281 root,
282 base_commit="b" * 64,
283 ours_commit="o" * 64,
284 theirs_commit="t" * 64,
285 conflict_paths=["a.mid"],
286 )
287 assert (root / ".muse" / "MERGE_STATE.json").exists()
288 clear_merge_state(root)
289 assert not (root / ".muse" / "MERGE_STATE.json").exists()