test_core_coverage_gaps.py
python
| 1 | """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine.""" |
| 2 | |
| 3 | import json |
| 4 | import os |
| 5 | import pathlib |
| 6 | |
| 7 | import pytest |
| 8 | |
| 9 | from muse.core.object_store import ( |
| 10 | has_object, |
| 11 | object_path, |
| 12 | objects_dir, |
| 13 | read_object, |
| 14 | restore_object, |
| 15 | write_object, |
| 16 | write_object_from_path, |
| 17 | ) |
| 18 | from muse.core.repo import find_repo_root, require_repo |
| 19 | from muse.core.store import ( |
| 20 | CommitRecord, |
| 21 | SnapshotRecord, |
| 22 | get_commits_for_branch, |
| 23 | get_head_commit_id, |
| 24 | get_head_snapshot_id, |
| 25 | get_head_snapshot_manifest, |
| 26 | get_tags_for_commit, |
| 27 | read_commit, |
| 28 | read_snapshot, |
| 29 | resolve_commit_ref, |
| 30 | update_commit_metadata, |
| 31 | write_commit, |
| 32 | write_snapshot, |
| 33 | ) |
| 34 | from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state |
| 35 | |
| 36 | import datetime |
| 37 | |
| 38 | |
| 39 | # --------------------------------------------------------------------------- |
| 40 | # object_store |
| 41 | # --------------------------------------------------------------------------- |
| 42 | |
| 43 | |
| 44 | class TestObjectStore: |
| 45 | def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None: |
| 46 | d = objects_dir(tmp_path) |
| 47 | assert d == tmp_path / ".muse" / "objects" |
| 48 | |
| 49 | def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None: |
| 50 | oid = "ab" + "c" * 62 |
| 51 | p = object_path(tmp_path, oid) |
| 52 | assert p.parent.name == "ab" |
| 53 | assert p.name == "c" * 62 |
| 54 | |
| 55 | def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 56 | assert not has_object(tmp_path, "a" * 64) |
| 57 | |
| 58 | def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None: |
| 59 | oid = "a" * 64 |
| 60 | write_object(tmp_path, oid, b"hello") |
| 61 | assert has_object(tmp_path, oid) |
| 62 | |
| 63 | def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None: |
| 64 | oid = "b" * 64 |
| 65 | assert write_object(tmp_path, oid, b"first") is True |
| 66 | assert write_object(tmp_path, oid, b"second") is False |
| 67 | # content should not change |
| 68 | assert read_object(tmp_path, oid) == b"first" |
| 69 | |
| 70 | def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None: |
| 71 | src = tmp_path / "src.bin" |
| 72 | src.write_bytes(b"content") |
| 73 | oid = "c" * 64 |
| 74 | assert write_object_from_path(tmp_path, oid, src) is True |
| 75 | assert write_object_from_path(tmp_path, oid, src) is False |
| 76 | |
| 77 | def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None: |
| 78 | src = tmp_path / "file.bin" |
| 79 | src.write_bytes(b"my bytes") |
| 80 | oid = "d" * 64 |
| 81 | write_object_from_path(tmp_path, oid, src) |
| 82 | assert read_object(tmp_path, oid) == b"my bytes" |
| 83 | |
| 84 | def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 85 | assert read_object(tmp_path, "e" * 64) is None |
| 86 | |
| 87 | def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None: |
| 88 | oid = "f" * 64 |
| 89 | write_object(tmp_path, oid, b"data") |
| 90 | assert read_object(tmp_path, oid) == b"data" |
| 91 | |
| 92 | def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None: |
| 93 | dest = tmp_path / "out.bin" |
| 94 | result = restore_object(tmp_path, "0" * 64, dest) |
| 95 | assert result is False |
| 96 | assert not dest.exists() |
| 97 | |
| 98 | def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None: |
| 99 | oid = "1" * 64 |
| 100 | write_object(tmp_path, oid, b"restored") |
| 101 | dest = tmp_path / "sub" / "out.bin" |
| 102 | result = restore_object(tmp_path, oid, dest) |
| 103 | assert result is True |
| 104 | assert dest.read_bytes() == b"restored" |
| 105 | |
| 106 | def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None: |
| 107 | oid = "2" * 64 |
| 108 | write_object(tmp_path, oid, b"nested") |
| 109 | dest = tmp_path / "a" / "b" / "c" / "file.bin" |
| 110 | restore_object(tmp_path, oid, dest) |
| 111 | assert dest.exists() |
| 112 | |
| 113 | |
| 114 | # --------------------------------------------------------------------------- |
| 115 | # repo |
| 116 | # --------------------------------------------------------------------------- |
| 117 | |
| 118 | |
| 119 | class TestFindRepoRoot: |
| 120 | def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None: |
| 121 | (tmp_path / ".muse").mkdir() |
| 122 | result = find_repo_root(tmp_path) |
| 123 | assert result == tmp_path |
| 124 | |
| 125 | def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None: |
| 126 | (tmp_path / ".muse").mkdir() |
| 127 | subdir = tmp_path / "a" / "b" |
| 128 | subdir.mkdir(parents=True) |
| 129 | result = find_repo_root(subdir) |
| 130 | assert result == tmp_path |
| 131 | |
| 132 | def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None: |
| 133 | result = find_repo_root(tmp_path) |
| 134 | assert result is None |
| 135 | |
| 136 | def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 137 | (tmp_path / ".muse").mkdir() |
| 138 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 139 | result = find_repo_root() |
| 140 | assert result == tmp_path |
| 141 | |
| 142 | def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 143 | # tmp_path exists but has no .muse/ |
| 144 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 145 | result = find_repo_root() |
| 146 | assert result is None |
| 147 | |
| 148 | def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 149 | import click |
| 150 | monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) |
| 151 | monkeypatch.chdir(tmp_path) |
| 152 | with pytest.raises(click.exceptions.Exit): |
| 153 | require_repo() |
| 154 | |
| 155 | |
| 156 | # --------------------------------------------------------------------------- |
| 157 | # store coverage gaps |
| 158 | # --------------------------------------------------------------------------- |
| 159 | |
| 160 | |
| 161 | class TestStoreGaps: |
| 162 | def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: |
| 163 | muse = tmp_path / ".muse" |
| 164 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 165 | (muse / d).mkdir(parents=True) |
| 166 | (muse / "HEAD").write_text("refs/heads/main\n") |
| 167 | (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 168 | (muse / "refs" / "heads" / "main").write_text("") |
| 169 | return tmp_path |
| 170 | |
| 171 | def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None: |
| 172 | root = self._make_repo(tmp_path) |
| 173 | assert get_head_commit_id(root, "main") is None |
| 174 | |
| 175 | def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None: |
| 176 | root = self._make_repo(tmp_path) |
| 177 | assert get_head_snapshot_id(root, "test-repo", "main") is None |
| 178 | |
| 179 | def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None: |
| 180 | root = self._make_repo(tmp_path) |
| 181 | assert get_head_snapshot_manifest(root, "test-repo", "main") is None |
| 182 | |
| 183 | def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None: |
| 184 | root = self._make_repo(tmp_path) |
| 185 | commits = get_commits_for_branch(root, "test-repo", "main") |
| 186 | assert commits == [] |
| 187 | |
| 188 | def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None: |
| 189 | root = self._make_repo(tmp_path) |
| 190 | snap = SnapshotRecord(snapshot_id="s" * 64, manifest={"a.mid": "h" * 64}) |
| 191 | write_snapshot(root, snap) |
| 192 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 193 | commit = CommitRecord( |
| 194 | commit_id="c" * 64, |
| 195 | repo_id="test-repo", |
| 196 | branch="main", |
| 197 | snapshot_id="s" * 64, |
| 198 | message="test", |
| 199 | committed_at=committed_at, |
| 200 | ) |
| 201 | write_commit(root, commit) |
| 202 | (root / ".muse" / "refs" / "heads" / "main").write_text("c" * 64) |
| 203 | |
| 204 | result = resolve_commit_ref(root, "test-repo", "main", None) |
| 205 | assert result is not None |
| 206 | assert result.commit_id == "c" * 64 |
| 207 | |
| 208 | def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 209 | root = self._make_repo(tmp_path) |
| 210 | assert read_commit(root, "unknown") is None |
| 211 | |
| 212 | def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 213 | root = self._make_repo(tmp_path) |
| 214 | assert read_snapshot(root, "unknown") is None |
| 215 | |
| 216 | def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None: |
| 217 | root = self._make_repo(tmp_path) |
| 218 | assert update_commit_metadata(root, "unknown", "key", "val") is False |
| 219 | |
| 220 | def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None: |
| 221 | root = self._make_repo(tmp_path) |
| 222 | tags = get_tags_for_commit(root, "test-repo", "c" * 64) |
| 223 | assert tags == [] |
| 224 | |
| 225 | |
| 226 | # --------------------------------------------------------------------------- |
| 227 | # merge_engine coverage gaps |
| 228 | # --------------------------------------------------------------------------- |
| 229 | |
| 230 | |
| 231 | class TestMergeEngineCoverageGaps: |
| 232 | def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: |
| 233 | muse = tmp_path / ".muse" |
| 234 | muse.mkdir(parents=True) |
| 235 | return tmp_path |
| 236 | |
| 237 | def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None: |
| 238 | root = self._make_repo(tmp_path) |
| 239 | # Should not raise even if MERGE_STATE.json is absent |
| 240 | clear_merge_state(root) |
| 241 | |
| 242 | def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None: |
| 243 | root = self._make_repo(tmp_path) |
| 244 | # Write a real object to the store |
| 245 | oid = "a" * 64 |
| 246 | write_object(root, oid, b"resolved content") |
| 247 | |
| 248 | apply_resolution(root, "track.mid", oid) |
| 249 | dest = root / "muse-work" / "track.mid" |
| 250 | assert dest.exists() |
| 251 | assert dest.read_bytes() == b"resolved content" |
| 252 | |
| 253 | def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None: |
| 254 | root = self._make_repo(tmp_path) |
| 255 | with pytest.raises(FileNotFoundError): |
| 256 | apply_resolution(root, "track.mid", "0" * 64) |
| 257 | |
| 258 | def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 259 | root = self._make_repo(tmp_path) |
| 260 | (root / ".muse" / "MERGE_STATE.json").write_text("not json {{") |
| 261 | result = read_merge_state(root) |
| 262 | assert result is None |
| 263 | |
| 264 | def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None: |
| 265 | root = self._make_repo(tmp_path) |
| 266 | write_merge_state( |
| 267 | root, |
| 268 | base_commit="b" * 64, |
| 269 | ours_commit="o" * 64, |
| 270 | theirs_commit="t" * 64, |
| 271 | conflict_paths=["a.mid"], |
| 272 | ) |
| 273 | assert (root / ".muse" / "MERGE_STATE.json").exists() |
| 274 | clear_merge_state(root) |
| 275 | assert not (root / ".muse" / "MERGE_STATE.json").exists() |