test_cat_object.py
python
| 1 | """Tests for ``muse cat-object``. |
| 2 | |
| 3 | All async tests call ``_cat_object_async`` and ``_lookup_object`` directly |
| 4 | with an in-memory SQLite session and a ``tmp_path`` repo root — no real |
| 5 | Postgres or running process required. ORM rows are seeded directly so the |
| 6 | lookup tests are independent of ``muse commit``. |
| 7 | """ |
| 8 | from __future__ import annotations |
| 9 | |
| 10 | import json |
| 11 | import pathlib |
| 12 | import uuid |
| 13 | from datetime import datetime, timezone |
| 14 | |
| 15 | import pytest |
| 16 | import typer |
| 17 | from sqlalchemy.ext.asyncio import AsyncSession |
| 18 | |
| 19 | from maestro.muse_cli.commands.cat_object import ( |
| 20 | CatObjectResult, |
| 21 | _cat_object_async, |
| 22 | _lookup_object, |
| 23 | ) |
| 24 | from maestro.muse_cli.errors import ExitCode |
| 25 | from maestro.muse_cli.models import MuseCliCommit, MuseCliObject, MuseCliSnapshot |
| 26 | |
| 27 | |
| 28 | # --------------------------------------------------------------------------- |
| 29 | # Helpers |
| 30 | # --------------------------------------------------------------------------- |
| 31 | |
| 32 | _UTC = timezone.utc |
| 33 | |
| 34 | |
| 35 | def _fake_hash(prefix: str = "a") -> str: |
| 36 | """Generate a deterministic 64-char hex string for use as an object ID.""" |
| 37 | return (prefix * 64)[:64] |
| 38 | |
| 39 | |
| 40 | def _init_muse_repo(root: pathlib.Path) -> str: |
| 41 | """Create a minimal ``.muse/`` directory so ``require_repo()`` succeeds.""" |
| 42 | rid = str(uuid.uuid4()) |
| 43 | muse = root / ".muse" |
| 44 | (muse / "refs" / "heads").mkdir(parents=True) |
| 45 | (muse / "repo.json").write_text(json.dumps({"repo_id": rid, "schema_version": "1"})) |
| 46 | (muse / "HEAD").write_text("refs/heads/main") |
| 47 | (muse / "refs" / "heads" / "main").write_text("") |
| 48 | return rid |
| 49 | |
| 50 | |
| 51 | async def _seed_object(session: AsyncSession, object_id: str, size: int = 1024) -> MuseCliObject: |
| 52 | obj = MuseCliObject( |
| 53 | object_id=object_id, |
| 54 | size_bytes=size, |
| 55 | created_at=datetime(2026, 1, 1, tzinfo=_UTC), |
| 56 | ) |
| 57 | session.add(obj) |
| 58 | await session.flush() |
| 59 | return obj |
| 60 | |
| 61 | |
| 62 | async def _seed_snapshot( |
| 63 | session: AsyncSession, |
| 64 | snapshot_id: str, |
| 65 | manifest: dict[str, str] | None = None, |
| 66 | ) -> MuseCliSnapshot: |
| 67 | snap = MuseCliSnapshot( |
| 68 | snapshot_id=snapshot_id, |
| 69 | manifest=manifest or {"beat.mid": _fake_hash("b")}, |
| 70 | created_at=datetime(2026, 1, 2, tzinfo=_UTC), |
| 71 | ) |
| 72 | session.add(snap) |
| 73 | await session.flush() |
| 74 | return snap |
| 75 | |
| 76 | |
| 77 | async def _seed_commit( |
| 78 | session: AsyncSession, |
| 79 | commit_id: str, |
| 80 | snapshot_id: str, |
| 81 | repo_id: str = "test-repo", |
| 82 | ) -> MuseCliCommit: |
| 83 | commit = MuseCliCommit( |
| 84 | commit_id=commit_id, |
| 85 | repo_id=repo_id, |
| 86 | branch="main", |
| 87 | parent_commit_id=None, |
| 88 | parent2_commit_id=None, |
| 89 | snapshot_id=snapshot_id, |
| 90 | message="initial take", |
| 91 | author="test-author", |
| 92 | committed_at=datetime(2026, 1, 3, tzinfo=_UTC), |
| 93 | created_at=datetime(2026, 1, 3, tzinfo=_UTC), |
| 94 | ) |
| 95 | session.add(commit) |
| 96 | await session.flush() |
| 97 | return commit |
| 98 | |
| 99 | |
| 100 | # --------------------------------------------------------------------------- |
| 101 | # _lookup_object — type resolution tests |
| 102 | # --------------------------------------------------------------------------- |
| 103 | |
| 104 | |
| 105 | @pytest.mark.anyio |
| 106 | async def test_lookup_object_finds_object_row( |
| 107 | muse_cli_db_session: AsyncSession, |
| 108 | ) -> None: |
| 109 | """_lookup_object returns type='object' when the ID is a MuseCliObject.""" |
| 110 | oid = _fake_hash("a") |
| 111 | await _seed_object(muse_cli_db_session, oid) |
| 112 | |
| 113 | result = await _lookup_object(muse_cli_db_session, oid) |
| 114 | |
| 115 | assert result is not None |
| 116 | assert result.object_type == "object" |
| 117 | assert isinstance(result.row, MuseCliObject) |
| 118 | |
| 119 | |
| 120 | @pytest.mark.anyio |
| 121 | async def test_lookup_object_finds_snapshot_row( |
| 122 | muse_cli_db_session: AsyncSession, |
| 123 | ) -> None: |
| 124 | """_lookup_object returns type='snapshot' when the ID is a MuseCliSnapshot.""" |
| 125 | sid = _fake_hash("c") |
| 126 | await _seed_snapshot(muse_cli_db_session, sid) |
| 127 | |
| 128 | result = await _lookup_object(muse_cli_db_session, sid) |
| 129 | |
| 130 | assert result is not None |
| 131 | assert result.object_type == "snapshot" |
| 132 | assert isinstance(result.row, MuseCliSnapshot) |
| 133 | |
| 134 | |
| 135 | @pytest.mark.anyio |
| 136 | async def test_lookup_object_finds_commit_row( |
| 137 | muse_cli_db_session: AsyncSession, |
| 138 | ) -> None: |
| 139 | """_lookup_object returns type='commit' when the ID is a MuseCliCommit.""" |
| 140 | sid = _fake_hash("c") |
| 141 | cid = _fake_hash("d") |
| 142 | await _seed_snapshot(muse_cli_db_session, sid) |
| 143 | await _seed_commit(muse_cli_db_session, cid, sid) |
| 144 | |
| 145 | result = await _lookup_object(muse_cli_db_session, cid) |
| 146 | |
| 147 | assert result is not None |
| 148 | assert result.object_type == "commit" |
| 149 | assert isinstance(result.row, MuseCliCommit) |
| 150 | |
| 151 | |
| 152 | @pytest.mark.anyio |
| 153 | async def test_lookup_object_returns_none_for_unknown_id( |
| 154 | muse_cli_db_session: AsyncSession, |
| 155 | ) -> None: |
| 156 | """_lookup_object returns None when the ID is not in any table.""" |
| 157 | result = await _lookup_object(muse_cli_db_session, _fake_hash("z")) |
| 158 | assert result is None |
| 159 | |
| 160 | |
| 161 | # --------------------------------------------------------------------------- |
| 162 | # _cat_object_async — default (metadata) output |
| 163 | # --------------------------------------------------------------------------- |
| 164 | |
| 165 | |
| 166 | @pytest.mark.anyio |
| 167 | async def test_cat_object_default_prints_object_metadata( |
| 168 | muse_cli_db_session: AsyncSession, |
| 169 | capsys: pytest.CaptureFixture[str], |
| 170 | ) -> None: |
| 171 | """Default output shows type, object_id, size, and created_at for blob objects.""" |
| 172 | oid = _fake_hash("a") |
| 173 | await _seed_object(muse_cli_db_session, oid, size=2048) |
| 174 | |
| 175 | await _cat_object_async( |
| 176 | session=muse_cli_db_session, |
| 177 | object_id=oid, |
| 178 | type_only=False, |
| 179 | pretty=False, |
| 180 | ) |
| 181 | |
| 182 | out = capsys.readouterr().out |
| 183 | assert "type: object" in out |
| 184 | assert oid in out |
| 185 | assert "2048 bytes" in out |
| 186 | |
| 187 | |
| 188 | @pytest.mark.anyio |
| 189 | async def test_cat_object_default_prints_snapshot_metadata( |
| 190 | muse_cli_db_session: AsyncSession, |
| 191 | capsys: pytest.CaptureFixture[str], |
| 192 | ) -> None: |
| 193 | """Default output shows type, snapshot_id, and file count for snapshots.""" |
| 194 | sid = _fake_hash("c") |
| 195 | manifest = {"beat.mid": _fake_hash("b"), "keys.mid": _fake_hash("e")} |
| 196 | await _seed_snapshot(muse_cli_db_session, sid, manifest=manifest) |
| 197 | |
| 198 | await _cat_object_async( |
| 199 | session=muse_cli_db_session, |
| 200 | object_id=sid, |
| 201 | type_only=False, |
| 202 | pretty=False, |
| 203 | ) |
| 204 | |
| 205 | out = capsys.readouterr().out |
| 206 | assert "type: snapshot" in out |
| 207 | assert sid in out |
| 208 | assert "files: 2" in out |
| 209 | |
| 210 | |
| 211 | @pytest.mark.anyio |
| 212 | async def test_cat_object_default_prints_commit_metadata( |
| 213 | muse_cli_db_session: AsyncSession, |
| 214 | capsys: pytest.CaptureFixture[str], |
| 215 | ) -> None: |
| 216 | """Default output shows type, commit_id, branch, message for commits.""" |
| 217 | sid = _fake_hash("c") |
| 218 | cid = _fake_hash("d") |
| 219 | await _seed_snapshot(muse_cli_db_session, sid) |
| 220 | await _seed_commit(muse_cli_db_session, cid, sid) |
| 221 | |
| 222 | await _cat_object_async( |
| 223 | session=muse_cli_db_session, |
| 224 | object_id=cid, |
| 225 | type_only=False, |
| 226 | pretty=False, |
| 227 | ) |
| 228 | |
| 229 | out = capsys.readouterr().out |
| 230 | assert "type: commit" in out |
| 231 | assert cid in out |
| 232 | assert "main" in out |
| 233 | assert "initial take" in out |
| 234 | |
| 235 | |
| 236 | # --------------------------------------------------------------------------- |
| 237 | # _cat_object_async — -t / --type flag |
| 238 | # --------------------------------------------------------------------------- |
| 239 | |
| 240 | |
| 241 | @pytest.mark.anyio |
| 242 | async def test_cat_object_type_only_prints_object( |
| 243 | muse_cli_db_session: AsyncSession, |
| 244 | capsys: pytest.CaptureFixture[str], |
| 245 | ) -> None: |
| 246 | """-t prints only 'object' for a MuseCliObject row.""" |
| 247 | oid = _fake_hash("a") |
| 248 | await _seed_object(muse_cli_db_session, oid) |
| 249 | |
| 250 | await _cat_object_async( |
| 251 | session=muse_cli_db_session, |
| 252 | object_id=oid, |
| 253 | type_only=True, |
| 254 | pretty=False, |
| 255 | ) |
| 256 | |
| 257 | out = capsys.readouterr().out.strip() |
| 258 | assert out == "object" |
| 259 | |
| 260 | |
| 261 | @pytest.mark.anyio |
| 262 | async def test_cat_object_type_only_prints_snapshot( |
| 263 | muse_cli_db_session: AsyncSession, |
| 264 | capsys: pytest.CaptureFixture[str], |
| 265 | ) -> None: |
| 266 | """-t prints only 'snapshot' for a MuseCliSnapshot row.""" |
| 267 | sid = _fake_hash("c") |
| 268 | await _seed_snapshot(muse_cli_db_session, sid) |
| 269 | |
| 270 | await _cat_object_async( |
| 271 | session=muse_cli_db_session, |
| 272 | object_id=sid, |
| 273 | type_only=True, |
| 274 | pretty=False, |
| 275 | ) |
| 276 | |
| 277 | out = capsys.readouterr().out.strip() |
| 278 | assert out == "snapshot" |
| 279 | |
| 280 | |
| 281 | @pytest.mark.anyio |
| 282 | async def test_cat_object_type_only_prints_commit( |
| 283 | muse_cli_db_session: AsyncSession, |
| 284 | capsys: pytest.CaptureFixture[str], |
| 285 | ) -> None: |
| 286 | """-t prints only 'commit' for a MuseCliCommit row.""" |
| 287 | sid = _fake_hash("c") |
| 288 | cid = _fake_hash("d") |
| 289 | await _seed_snapshot(muse_cli_db_session, sid) |
| 290 | await _seed_commit(muse_cli_db_session, cid, sid) |
| 291 | |
| 292 | await _cat_object_async( |
| 293 | session=muse_cli_db_session, |
| 294 | object_id=cid, |
| 295 | type_only=True, |
| 296 | pretty=False, |
| 297 | ) |
| 298 | |
| 299 | out = capsys.readouterr().out.strip() |
| 300 | assert out == "commit" |
| 301 | |
| 302 | |
| 303 | # --------------------------------------------------------------------------- |
| 304 | # _cat_object_async — -p / --pretty flag |
| 305 | # --------------------------------------------------------------------------- |
| 306 | |
| 307 | |
| 308 | @pytest.mark.anyio |
| 309 | async def test_cat_object_pretty_prints_snapshot_manifest( |
| 310 | muse_cli_db_session: AsyncSession, |
| 311 | capsys: pytest.CaptureFixture[str], |
| 312 | ) -> None: |
| 313 | """-p emits valid JSON containing the manifest dict for snapshots.""" |
| 314 | sid = _fake_hash("c") |
| 315 | manifest = {"beat.mid": _fake_hash("b"), "keys.mid": _fake_hash("e")} |
| 316 | await _seed_snapshot(muse_cli_db_session, sid, manifest=manifest) |
| 317 | |
| 318 | await _cat_object_async( |
| 319 | session=muse_cli_db_session, |
| 320 | object_id=sid, |
| 321 | type_only=False, |
| 322 | pretty=True, |
| 323 | ) |
| 324 | |
| 325 | out = capsys.readouterr().out |
| 326 | data = json.loads(out) |
| 327 | assert data["type"] == "snapshot" |
| 328 | assert data["snapshot_id"] == sid |
| 329 | assert data["manifest"] == manifest |
| 330 | |
| 331 | |
| 332 | @pytest.mark.anyio |
| 333 | async def test_cat_object_pretty_prints_commit_fields( |
| 334 | muse_cli_db_session: AsyncSession, |
| 335 | capsys: pytest.CaptureFixture[str], |
| 336 | ) -> None: |
| 337 | """-p emits valid JSON with all commit fields.""" |
| 338 | sid = _fake_hash("c") |
| 339 | cid = _fake_hash("d") |
| 340 | await _seed_snapshot(muse_cli_db_session, sid) |
| 341 | await _seed_commit(muse_cli_db_session, cid, sid) |
| 342 | |
| 343 | await _cat_object_async( |
| 344 | session=muse_cli_db_session, |
| 345 | object_id=cid, |
| 346 | type_only=False, |
| 347 | pretty=True, |
| 348 | ) |
| 349 | |
| 350 | out = capsys.readouterr().out |
| 351 | data = json.loads(out) |
| 352 | assert data["type"] == "commit" |
| 353 | assert data["commit_id"] == cid |
| 354 | assert data["branch"] == "main" |
| 355 | assert data["message"] == "initial take" |
| 356 | assert data["snapshot_id"] == sid |
| 357 | |
| 358 | |
| 359 | @pytest.mark.anyio |
| 360 | async def test_cat_object_pretty_prints_object_fields( |
| 361 | muse_cli_db_session: AsyncSession, |
| 362 | capsys: pytest.CaptureFixture[str], |
| 363 | ) -> None: |
| 364 | """-p emits valid JSON with size and created_at for blob objects.""" |
| 365 | oid = _fake_hash("a") |
| 366 | await _seed_object(muse_cli_db_session, oid, size=512) |
| 367 | |
| 368 | await _cat_object_async( |
| 369 | session=muse_cli_db_session, |
| 370 | object_id=oid, |
| 371 | type_only=False, |
| 372 | pretty=True, |
| 373 | ) |
| 374 | |
| 375 | out = capsys.readouterr().out |
| 376 | data = json.loads(out) |
| 377 | assert data["type"] == "object" |
| 378 | assert data["object_id"] == oid |
| 379 | assert data["size_bytes"] == 512 |
| 380 | |
| 381 | |
| 382 | # --------------------------------------------------------------------------- |
| 383 | # _cat_object_async — not found error |
| 384 | # --------------------------------------------------------------------------- |
| 385 | |
| 386 | |
| 387 | @pytest.mark.anyio |
| 388 | async def test_cat_object_not_found_exits_user_error( |
| 389 | muse_cli_db_session: AsyncSession, |
| 390 | capsys: pytest.CaptureFixture[str], |
| 391 | ) -> None: |
| 392 | """Unknown object ID exits with USER_ERROR and a clear message.""" |
| 393 | with pytest.raises(typer.Exit) as exc_info: |
| 394 | await _cat_object_async( |
| 395 | session=muse_cli_db_session, |
| 396 | object_id=_fake_hash("z"), |
| 397 | type_only=False, |
| 398 | pretty=False, |
| 399 | ) |
| 400 | |
| 401 | assert exc_info.value.exit_code == ExitCode.USER_ERROR |
| 402 | out = capsys.readouterr().out |
| 403 | assert "Object not found" in out |
| 404 | |
| 405 | |
| 406 | # --------------------------------------------------------------------------- |
| 407 | # CatObjectResult.to_dict — serialisation |
| 408 | # --------------------------------------------------------------------------- |
| 409 | |
| 410 | |
| 411 | @pytest.mark.anyio |
| 412 | async def test_cat_object_result_to_dict_object( |
| 413 | muse_cli_db_session: AsyncSession, |
| 414 | ) -> None: |
| 415 | """CatObjectResult.to_dict() includes all expected keys for an object row.""" |
| 416 | oid = _fake_hash("a") |
| 417 | obj = await _seed_object(muse_cli_db_session, oid, size=128) |
| 418 | result = CatObjectResult(object_type="object", row=obj) |
| 419 | d = result.to_dict() |
| 420 | assert set(d.keys()) == {"type", "object_id", "size_bytes", "created_at"} |
| 421 | assert d["type"] == "object" |
| 422 | assert d["size_bytes"] == 128 |
| 423 | |
| 424 | |
| 425 | @pytest.mark.anyio |
| 426 | async def test_cat_object_result_to_dict_commit( |
| 427 | muse_cli_db_session: AsyncSession, |
| 428 | ) -> None: |
| 429 | """CatObjectResult.to_dict() includes all expected keys for a commit row.""" |
| 430 | sid = _fake_hash("c") |
| 431 | cid = _fake_hash("d") |
| 432 | await _seed_snapshot(muse_cli_db_session, sid) |
| 433 | commit = await _seed_commit(muse_cli_db_session, cid, sid) |
| 434 | result = CatObjectResult(object_type="commit", row=commit) |
| 435 | d = result.to_dict() |
| 436 | assert "commit_id" in d |
| 437 | assert "branch" in d |
| 438 | assert "message" in d |
| 439 | assert "snapshot_id" in d |
| 440 | |
| 441 | |
| 442 | # --------------------------------------------------------------------------- |
| 443 | # CLI integration — mutually exclusive flags |
| 444 | # --------------------------------------------------------------------------- |
| 445 | |
| 446 | |
| 447 | def test_cat_object_type_and_pretty_mutually_exclusive( |
| 448 | tmp_path: pathlib.Path, |
| 449 | ) -> None: |
| 450 | """Passing both -t and -p exits with USER_ERROR.""" |
| 451 | import os |
| 452 | from typer.testing import CliRunner |
| 453 | from maestro.muse_cli.app import cli |
| 454 | |
| 455 | _init_muse_repo(tmp_path) |
| 456 | runner = CliRunner() |
| 457 | prev = os.getcwd() |
| 458 | try: |
| 459 | os.chdir(tmp_path) |
| 460 | result = runner.invoke( |
| 461 | cli, |
| 462 | ["cat-object", "-t", "-p", _fake_hash("a")], |
| 463 | catch_exceptions=False, |
| 464 | ) |
| 465 | finally: |
| 466 | os.chdir(prev) |
| 467 | |
| 468 | assert result.exit_code == ExitCode.USER_ERROR |
| 469 | |
| 470 | |
| 471 | def test_cat_object_outside_repo_exits_2(tmp_path: pathlib.Path) -> None: |
| 472 | """``muse cat-object`` outside a .muse/ directory exits with code 2.""" |
| 473 | import os |
| 474 | from typer.testing import CliRunner |
| 475 | from maestro.muse_cli.app import cli |
| 476 | |
| 477 | runner = CliRunner() |
| 478 | prev = os.getcwd() |
| 479 | try: |
| 480 | os.chdir(tmp_path) |
| 481 | result = runner.invoke( |
| 482 | cli, |
| 483 | ["cat-object", _fake_hash("a")], |
| 484 | catch_exceptions=False, |
| 485 | ) |
| 486 | finally: |
| 487 | os.chdir(prev) |
| 488 | |
| 489 | assert result.exit_code == ExitCode.REPO_NOT_FOUND |