test_core_pack.py
python
| 1 | """Tests for muse.core.pack — PackBundle build and apply operations.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import base64 |
| 6 | import datetime |
| 7 | import json |
| 8 | import pathlib |
| 9 | |
| 10 | import pytest |
| 11 | |
| 12 | from muse.core.object_store import has_object, read_object, write_object |
| 13 | from muse.core.pack import ( |
| 14 | ObjectPayload, |
| 15 | PackBundle, |
| 16 | apply_pack, |
| 17 | build_pack, |
| 18 | ) |
| 19 | from muse.core.store import ( |
| 20 | CommitRecord, |
| 21 | SnapshotRecord, |
| 22 | read_commit, |
| 23 | read_snapshot, |
| 24 | write_commit, |
| 25 | write_snapshot, |
| 26 | ) |
| 27 | |
| 28 | |
| 29 | # --------------------------------------------------------------------------- |
| 30 | # Fixtures |
| 31 | # --------------------------------------------------------------------------- |
| 32 | |
| 33 | |
| 34 | @pytest.fixture |
| 35 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 36 | """Minimal .muse/ repo structure.""" |
| 37 | muse_dir = tmp_path / ".muse" |
| 38 | (muse_dir / "commits").mkdir(parents=True) |
| 39 | (muse_dir / "snapshots").mkdir(parents=True) |
| 40 | (muse_dir / "objects").mkdir(parents=True) |
| 41 | (muse_dir / "refs" / "heads").mkdir(parents=True) |
| 42 | (muse_dir / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 43 | (muse_dir / "HEAD").write_text("refs/heads/main\n") |
| 44 | (muse_dir / "refs" / "heads" / "main").write_text("") |
| 45 | return tmp_path |
| 46 | |
| 47 | |
| 48 | def _make_object(root: pathlib.Path, content: bytes) -> str: |
| 49 | """Write raw bytes into the object store; return the object_id.""" |
| 50 | import hashlib |
| 51 | oid = hashlib.sha256(content).hexdigest() |
| 52 | write_object(root, oid, content) |
| 53 | return oid |
| 54 | |
| 55 | |
| 56 | def _make_snapshot( |
| 57 | root: pathlib.Path, snapshot_id: str, manifest: dict[str, str] |
| 58 | ) -> SnapshotRecord: |
| 59 | s = SnapshotRecord(snapshot_id=snapshot_id, manifest=manifest) |
| 60 | write_snapshot(root, s) |
| 61 | return s |
| 62 | |
| 63 | |
| 64 | def _make_commit( |
| 65 | root: pathlib.Path, |
| 66 | commit_id: str, |
| 67 | snapshot_id: str, |
| 68 | message: str = "test", |
| 69 | parent: str | None = None, |
| 70 | ) -> CommitRecord: |
| 71 | c = CommitRecord( |
| 72 | commit_id=commit_id, |
| 73 | repo_id="test-repo", |
| 74 | branch="main", |
| 75 | snapshot_id=snapshot_id, |
| 76 | message=message, |
| 77 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 78 | parent_commit_id=parent, |
| 79 | ) |
| 80 | write_commit(root, c) |
| 81 | return c |
| 82 | |
| 83 | |
| 84 | # --------------------------------------------------------------------------- |
| 85 | # build_pack tests |
| 86 | # --------------------------------------------------------------------------- |
| 87 | |
| 88 | |
| 89 | class TestBuildPack: |
| 90 | def test_single_commit_no_history(self, repo: pathlib.Path) -> None: |
| 91 | content = b"hello world" |
| 92 | oid = _make_object(repo, content) |
| 93 | _make_snapshot(repo, "snap1", {"file.txt": oid}) |
| 94 | _make_commit(repo, "commit1", "snap1") |
| 95 | |
| 96 | bundle = build_pack(repo, ["commit1"]) |
| 97 | |
| 98 | assert len(bundle.get("commits") or []) == 1 |
| 99 | assert len(bundle.get("snapshots") or []) == 1 |
| 100 | assert len(bundle.get("objects") or []) == 1 |
| 101 | assert (bundle.get("objects") or [{}])[0]["object_id"] == oid |
| 102 | |
| 103 | def test_object_is_base64_encoded(self, repo: pathlib.Path) -> None: |
| 104 | content = b"\x00\x01\x02\x03" |
| 105 | oid = _make_object(repo, content) |
| 106 | _make_snapshot(repo, "snap1", {"bin.dat": oid}) |
| 107 | _make_commit(repo, "c1", "snap1") |
| 108 | |
| 109 | bundle = build_pack(repo, ["c1"]) |
| 110 | |
| 111 | objs = bundle.get("objects") or [] |
| 112 | assert len(objs) == 1 |
| 113 | decoded = base64.b64decode(objs[0]["content_b64"]) |
| 114 | assert decoded == content |
| 115 | |
| 116 | def test_multi_commit_chain(self, repo: pathlib.Path) -> None: |
| 117 | oid1 = _make_object(repo, b"v1") |
| 118 | oid2 = _make_object(repo, b"v2") |
| 119 | _make_snapshot(repo, "snap1", {"f.txt": oid1}) |
| 120 | _make_snapshot(repo, "snap2", {"f.txt": oid2}) |
| 121 | _make_commit(repo, "c1", "snap1") |
| 122 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 123 | |
| 124 | bundle = build_pack(repo, ["c2"]) |
| 125 | |
| 126 | assert len(bundle.get("commits") or []) == 2 |
| 127 | assert len(bundle.get("snapshots") or []) == 2 |
| 128 | assert len(bundle.get("objects") or []) == 2 |
| 129 | |
| 130 | def test_have_excludes_ancestor_commits(self, repo: pathlib.Path) -> None: |
| 131 | oid1 = _make_object(repo, b"v1") |
| 132 | oid2 = _make_object(repo, b"v2") |
| 133 | _make_snapshot(repo, "snap1", {"f.txt": oid1}) |
| 134 | _make_snapshot(repo, "snap2", {"f.txt": oid2}) |
| 135 | _make_commit(repo, "c1", "snap1") |
| 136 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 137 | |
| 138 | bundle = build_pack(repo, ["c2"], have=["c1"]) |
| 139 | |
| 140 | # Only c2 should be in the bundle; c1 is in have. |
| 141 | commit_ids = [c["commit_id"] for c in (bundle.get("commits") or [])] |
| 142 | assert "c2" in commit_ids |
| 143 | assert "c1" not in commit_ids |
| 144 | |
| 145 | def test_deduplicates_shared_objects(self, repo: pathlib.Path) -> None: |
| 146 | shared_oid = _make_object(repo, b"shared") |
| 147 | _make_snapshot(repo, "snap1", {"a.txt": shared_oid}) |
| 148 | _make_snapshot(repo, "snap2", {"b.txt": shared_oid}) |
| 149 | _make_commit(repo, "c1", "snap1") |
| 150 | _make_commit(repo, "c2", "snap2", parent="c1") |
| 151 | |
| 152 | bundle = build_pack(repo, ["c2"]) |
| 153 | |
| 154 | # Shared object should appear only once. |
| 155 | object_ids = [o["object_id"] for o in (bundle.get("objects") or [])] |
| 156 | assert object_ids.count(shared_oid) == 1 |
| 157 | |
| 158 | def test_empty_commit_ids_returns_empty_bundle(self, repo: pathlib.Path) -> None: |
| 159 | bundle = build_pack(repo, []) |
| 160 | assert (bundle.get("commits") or []) == [] |
| 161 | assert (bundle.get("objects") or []) == [] |
| 162 | |
| 163 | def test_missing_commit_skipped_gracefully(self, repo: pathlib.Path) -> None: |
| 164 | # Should not raise even if a commit_id does not exist. |
| 165 | bundle = build_pack(repo, ["nonexistent"]) |
| 166 | assert (bundle.get("commits") or []) == [] |
| 167 | |
| 168 | def test_merge_commit_includes_both_parents(self, repo: pathlib.Path) -> None: |
| 169 | oid_a = _make_object(repo, b"branch-a") |
| 170 | oid_b = _make_object(repo, b"branch-b") |
| 171 | _make_snapshot(repo, "snap_a", {"a.txt": oid_a}) |
| 172 | _make_snapshot(repo, "snap_b", {"b.txt": oid_b}) |
| 173 | _make_snapshot(repo, "snap_m", {"a.txt": oid_a, "b.txt": oid_b}) |
| 174 | _make_commit(repo, "c_a", "snap_a") |
| 175 | _make_commit(repo, "c_b", "snap_b") |
| 176 | # Merge commit with two parents |
| 177 | c_merge = CommitRecord( |
| 178 | commit_id="c_merge", |
| 179 | repo_id="test-repo", |
| 180 | branch="main", |
| 181 | snapshot_id="snap_m", |
| 182 | message="merge", |
| 183 | committed_at=datetime.datetime.now(datetime.timezone.utc), |
| 184 | parent_commit_id="c_a", |
| 185 | parent2_commit_id="c_b", |
| 186 | ) |
| 187 | write_commit(repo, c_merge) |
| 188 | |
| 189 | bundle = build_pack(repo, ["c_merge"]) |
| 190 | commit_ids = {c["commit_id"] for c in (bundle.get("commits") or [])} |
| 191 | assert {"c_merge", "c_a", "c_b"}.issubset(commit_ids) |
| 192 | |
| 193 | |
| 194 | # --------------------------------------------------------------------------- |
| 195 | # apply_pack tests |
| 196 | # --------------------------------------------------------------------------- |
| 197 | |
| 198 | |
| 199 | class TestApplyPack: |
| 200 | def test_round_trip(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: |
| 201 | """build_pack → apply_pack in a fresh repo produces identical data.""" |
| 202 | content = b"round trip" |
| 203 | oid = _make_object(repo, content) |
| 204 | _make_snapshot(repo, "snap1", {"f.txt": oid}) |
| 205 | _make_commit(repo, "c1", "snap1", message="initial") |
| 206 | |
| 207 | bundle = build_pack(repo, ["c1"]) |
| 208 | |
| 209 | # Apply into a fresh repo. |
| 210 | dest = tmp_path / "dest" |
| 211 | muse_dir = dest / ".muse" |
| 212 | (muse_dir / "commits").mkdir(parents=True) |
| 213 | (muse_dir / "snapshots").mkdir(parents=True) |
| 214 | (muse_dir / "objects").mkdir(parents=True) |
| 215 | |
| 216 | new_count = apply_pack(dest, bundle) |
| 217 | |
| 218 | assert new_count == 1 |
| 219 | assert has_object(dest, oid) |
| 220 | assert read_object(dest, oid) == content |
| 221 | assert read_snapshot(dest, "snap1") is not None |
| 222 | assert read_commit(dest, "c1") is not None |
| 223 | |
| 224 | def test_idempotent_apply(self, repo: pathlib.Path) -> None: |
| 225 | """Applying the same bundle twice does not raise and new_count = 0.""" |
| 226 | content = b"idempotent" |
| 227 | oid = _make_object(repo, content) |
| 228 | _make_snapshot(repo, "snap1", {"f.txt": oid}) |
| 229 | _make_commit(repo, "c1", "snap1") |
| 230 | |
| 231 | bundle = build_pack(repo, ["c1"]) |
| 232 | apply_pack(repo, bundle) |
| 233 | new_count = apply_pack(repo, bundle) |
| 234 | |
| 235 | assert new_count == 0 # All already present. |
| 236 | |
| 237 | def test_malformed_object_skipped(self, repo: pathlib.Path) -> None: |
| 238 | bundle: PackBundle = { |
| 239 | "commits": [], |
| 240 | "snapshots": [], |
| 241 | "objects": [ObjectPayload(object_id="abc123", content_b64="NOT_VALID_BASE64!!!")], |
| 242 | } |
| 243 | new_count = apply_pack(repo, bundle) |
| 244 | assert new_count == 0 |
| 245 | |
| 246 | def test_empty_bundle_is_noop(self, repo: pathlib.Path) -> None: |
| 247 | bundle: PackBundle = {} |
| 248 | new_count = apply_pack(repo, bundle) |
| 249 | assert new_count == 0 |
| 250 | |
| 251 | def test_apply_preserves_commit_metadata( |
| 252 | self, repo: pathlib.Path, tmp_path: pathlib.Path |
| 253 | ) -> None: |
| 254 | oid = _make_object(repo, b"data") |
| 255 | _make_snapshot(repo, "s1", {"data.bin": oid}) |
| 256 | _make_commit(repo, "c1", "s1", message="preserve me") |
| 257 | |
| 258 | bundle = build_pack(repo, ["c1"]) |
| 259 | |
| 260 | dest = tmp_path / "d" |
| 261 | (dest / ".muse" / "commits").mkdir(parents=True) |
| 262 | (dest / ".muse" / "snapshots").mkdir(parents=True) |
| 263 | (dest / ".muse" / "objects").mkdir(parents=True) |
| 264 | apply_pack(dest, bundle) |
| 265 | |
| 266 | commit = read_commit(dest, "c1") |
| 267 | assert commit is not None |
| 268 | assert commit.message == "preserve me" |
| 269 | assert commit.snapshot_id == "s1" |
| 270 | |
| 271 | def test_apply_returns_new_object_count( |
| 272 | self, repo: pathlib.Path, tmp_path: pathlib.Path |
| 273 | ) -> None: |
| 274 | oid1 = _make_object(repo, b"obj1") |
| 275 | oid2 = _make_object(repo, b"obj2") |
| 276 | _make_snapshot(repo, "s1", {"a": oid1, "b": oid2}) |
| 277 | _make_commit(repo, "c1", "s1") |
| 278 | |
| 279 | bundle = build_pack(repo, ["c1"]) |
| 280 | dest = tmp_path / "d" |
| 281 | (dest / ".muse" / "commits").mkdir(parents=True) |
| 282 | (dest / ".muse" / "snapshots").mkdir(parents=True) |
| 283 | (dest / ".muse" / "objects").mkdir(parents=True) |
| 284 | |
| 285 | count = apply_pack(dest, bundle) |
| 286 | assert count == 2 |