cgcardona / muse public
test_muse_tag.py python
473 lines 14.3 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Tests for ``muse tag`` — music-semantic tagging of commits.
2
3 Verifies:
4 - tag add: attaches a tag to a commit; idempotent on duplicate.
5 - tag remove: removes an existing tag; exits USER_ERROR when not found.
6 - tag list: returns sorted tags; prints "No tags" when empty.
7 - tag search: exact match and prefix (namespace) match.
8 - tag add: exits USER_ERROR when commit does not exist.
9 - tag add on HEAD resolved from .muse/HEAD when no commit_ref is given.
10 - Boundary seal (AST): ``from __future__ import annotations`` present.
11 """
12 from __future__ import annotations
13
14 import ast
15 import datetime
16 import json
17 import pathlib
18 import uuid
19 from collections.abc import AsyncGenerator
20
21 import pytest
22 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
23
24 from maestro.db.database import Base
25 from maestro.muse_cli import models as cli_models # noqa: F401 — register tables
26 from maestro.muse_cli.errors import ExitCode
27 from maestro.muse_cli.models import MuseCliCommit, MuseCliObject, MuseCliSnapshot, MuseCliTag
28 from maestro.muse_cli.commands.tag import (
29 _tag_add_async,
30 _tag_list_async,
31 _tag_remove_async,
32 _tag_search_async,
33 )
34
35
36 # ---------------------------------------------------------------------------
37 # Fixtures
38 # ---------------------------------------------------------------------------
39
40
41 @pytest.fixture
42 async def async_session() -> AsyncGenerator[AsyncSession, None]:
43 """In-memory SQLite session with all CLI tables created."""
44 engine = create_async_engine("sqlite+aiosqlite:///:memory:")
45 async with engine.begin() as conn:
46 await conn.run_sync(Base.metadata.create_all)
47 Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
48 async with Session() as session:
49 yield session
50 await engine.dispose()
51
52
53 @pytest.fixture
54 def repo_root(tmp_path: pathlib.Path) -> pathlib.Path:
55 """Create a minimal Muse repo structure under *tmp_path*."""
56 muse_dir = tmp_path / ".muse"
57 muse_dir.mkdir()
58 (muse_dir / "HEAD").write_text("refs/heads/main")
59 refs_dir = muse_dir / "refs" / "heads"
60 refs_dir.mkdir(parents=True)
61 return tmp_path
62
63
64 @pytest.fixture
65 def repo_id() -> str:
66 return str(uuid.uuid4())
67
68
69 @pytest.fixture
70 def write_repo_json(repo_root: pathlib.Path, repo_id: str) -> None:
71 """Write .muse/repo.json with a stable repo_id."""
72 (repo_root / ".muse" / "repo.json").write_text(json.dumps({"repo_id": repo_id}))
73
74
75 async def _insert_commit(
76 session: AsyncSession,
77 repo_id: str,
78 repo_root: pathlib.Path,
79 ) -> str:
80 """Insert a minimal MuseCliCommit and return its commit_id.
81
82 Also updates .muse/refs/heads/main so HEAD resolution works.
83 """
84 snapshot_id = "a" * 64
85 commit_id = "b" * 64
86
87 session.add(MuseCliObject(object_id="c" * 64, size_bytes=1))
88 session.add(MuseCliSnapshot(snapshot_id=snapshot_id, manifest={"f.mid": "c" * 64}))
89 await session.flush()
90
91 session.add(
92 MuseCliCommit(
93 commit_id=commit_id,
94 repo_id=repo_id,
95 branch="main",
96 parent_commit_id=None,
97 parent2_commit_id=None,
98 snapshot_id=snapshot_id,
99 message="initial",
100 author="",
101 committed_at=datetime.datetime.now(datetime.timezone.utc),
102 )
103 )
104 await session.flush()
105
106 # Update HEAD pointer so _resolve_commit_id works without explicit ref
107 ref_path = repo_root / ".muse" / "refs" / "heads" / "main"
108 ref_path.write_text(commit_id)
109
110 return commit_id
111
112
113 # ---------------------------------------------------------------------------
114 # tag add
115 # ---------------------------------------------------------------------------
116
117
118 @pytest.mark.anyio
119 async def test_tag_add_attaches_tag(
120 async_session: AsyncSession,
121 repo_root: pathlib.Path,
122 repo_id: str,
123 write_repo_json: None,
124 ) -> None:
125 """tag add stores a MuseCliTag row for the target commit."""
126 commit_id = await _insert_commit(async_session, repo_id, repo_root)
127
128 await _tag_add_async(
129 tag="emotion:melancholic",
130 commit_ref=commit_id,
131 root=repo_root,
132 session=async_session,
133 )
134 await async_session.flush()
135
136 from sqlalchemy import select
137
138 result = await async_session.execute(
139 select(MuseCliTag).where(MuseCliTag.commit_id == commit_id)
140 )
141 tags = result.scalars().all()
142 assert len(tags) == 1
143 assert tags[0].tag == "emotion:melancholic"
144 assert tags[0].repo_id == repo_id
145
146
147 @pytest.mark.anyio
148 async def test_tag_add_is_idempotent(
149 async_session: AsyncSession,
150 repo_root: pathlib.Path,
151 repo_id: str,
152 write_repo_json: None,
153 ) -> None:
154 """Adding the same tag twice must not create a duplicate row."""
155 commit_id = await _insert_commit(async_session, repo_id, repo_root)
156
157 await _tag_add_async(
158 tag="stage:rough-mix",
159 commit_ref=commit_id,
160 root=repo_root,
161 session=async_session,
162 )
163 await _tag_add_async(
164 tag="stage:rough-mix",
165 commit_ref=commit_id,
166 root=repo_root,
167 session=async_session,
168 )
169 await async_session.flush()
170
171 from sqlalchemy import select
172
173 result = await async_session.execute(
174 select(MuseCliTag).where(
175 MuseCliTag.commit_id == commit_id, MuseCliTag.tag == "stage:rough-mix"
176 )
177 )
178 assert len(result.scalars().all()) == 1
179
180
181 @pytest.mark.anyio
182 async def test_tag_add_missing_commit_exits_user_error(
183 async_session: AsyncSession,
184 repo_root: pathlib.Path,
185 repo_id: str,
186 write_repo_json: None,
187 ) -> None:
188 """tag add on a non-existent commit must exit with USER_ERROR."""
189 import typer
190
191 with pytest.raises(typer.Exit) as exc_info:
192 await _tag_add_async(
193 tag="tempo:120bpm",
194 commit_ref="d" * 64,
195 root=repo_root,
196 session=async_session,
197 )
198 assert exc_info.value.exit_code == ExitCode.USER_ERROR
199
200
201 @pytest.mark.anyio
202 async def test_tag_add_uses_head_when_no_commit_ref(
203 async_session: AsyncSession,
204 repo_root: pathlib.Path,
205 repo_id: str,
206 write_repo_json: None,
207 ) -> None:
208 """When commit_ref is None, the current HEAD commit is tagged."""
209 commit_id = await _insert_commit(async_session, repo_id, repo_root)
210
211 await _tag_add_async(
212 tag="key:Am",
213 commit_ref=None, # use HEAD
214 root=repo_root,
215 session=async_session,
216 )
217 await async_session.flush()
218
219 from sqlalchemy import select
220
221 result = await async_session.execute(
222 select(MuseCliTag).where(MuseCliTag.commit_id == commit_id, MuseCliTag.tag == "key:Am")
223 )
224 assert result.scalar_one_or_none() is not None
225
226
227 # ---------------------------------------------------------------------------
228 # tag remove
229 # ---------------------------------------------------------------------------
230
231
232 @pytest.mark.anyio
233 async def test_tag_remove_deletes_existing_tag(
234 async_session: AsyncSession,
235 repo_root: pathlib.Path,
236 repo_id: str,
237 write_repo_json: None,
238 ) -> None:
239 """tag remove deletes the row and succeeds."""
240 commit_id = await _insert_commit(async_session, repo_id, repo_root)
241
242 await _tag_add_async(
243 tag="ref:beatles",
244 commit_ref=commit_id,
245 root=repo_root,
246 session=async_session,
247 )
248 await async_session.flush()
249
250 await _tag_remove_async(
251 tag="ref:beatles",
252 commit_ref=commit_id,
253 root=repo_root,
254 session=async_session,
255 )
256 await async_session.flush()
257
258 from sqlalchemy import select
259
260 result = await async_session.execute(
261 select(MuseCliTag).where(MuseCliTag.commit_id == commit_id, MuseCliTag.tag == "ref:beatles")
262 )
263 assert result.scalar_one_or_none() is None
264
265
266 @pytest.mark.anyio
267 async def test_tag_remove_missing_tag_exits_user_error(
268 async_session: AsyncSession,
269 repo_root: pathlib.Path,
270 repo_id: str,
271 write_repo_json: None,
272 ) -> None:
273 """Removing a tag that was never added must exit with USER_ERROR."""
274 import typer
275
276 commit_id = await _insert_commit(async_session, repo_id, repo_root)
277
278 with pytest.raises(typer.Exit) as exc_info:
279 await _tag_remove_async(
280 tag="nonexistent",
281 commit_ref=commit_id,
282 root=repo_root,
283 session=async_session,
284 )
285 assert exc_info.value.exit_code == ExitCode.USER_ERROR
286
287
288 # ---------------------------------------------------------------------------
289 # tag list
290 # ---------------------------------------------------------------------------
291
292
293 @pytest.mark.anyio
294 async def test_tag_list_returns_sorted_tags(
295 async_session: AsyncSession,
296 repo_root: pathlib.Path,
297 repo_id: str,
298 write_repo_json: None,
299 ) -> None:
300 """tag list returns all tags sorted alphabetically."""
301 commit_id = await _insert_commit(async_session, repo_id, repo_root)
302
303 for t in ["stage:master", "emotion:hopeful", "tempo:90bpm"]:
304 await _tag_add_async(
305 tag=t,
306 commit_ref=commit_id,
307 root=repo_root,
308 session=async_session,
309 )
310 await async_session.flush()
311
312 tags = await _tag_list_async(commit_ref=commit_id, root=repo_root, session=async_session)
313 assert tags == sorted(["stage:master", "emotion:hopeful", "tempo:90bpm"])
314
315
316 @pytest.mark.anyio
317 async def test_tag_list_empty_commit(
318 async_session: AsyncSession,
319 repo_root: pathlib.Path,
320 repo_id: str,
321 write_repo_json: None,
322 ) -> None:
323 """tag list on a commit with no tags returns an empty list."""
324 commit_id = await _insert_commit(async_session, repo_id, repo_root)
325
326 tags = await _tag_list_async(commit_ref=commit_id, root=repo_root, session=async_session)
327 assert tags == []
328
329
330 # ---------------------------------------------------------------------------
331 # tag search
332 # ---------------------------------------------------------------------------
333
334
335 @pytest.mark.anyio
336 async def test_tag_search_exact_match(
337 async_session: AsyncSession,
338 repo_root: pathlib.Path,
339 repo_id: str,
340 write_repo_json: None,
341 ) -> None:
342 """tag search with an exact string returns matching (commit_id, tag) pairs."""
343 commit_id = await _insert_commit(async_session, repo_id, repo_root)
344
345 await _tag_add_async(
346 tag="emotion:melancholic",
347 commit_ref=commit_id,
348 root=repo_root,
349 session=async_session,
350 )
351 await _tag_add_async(
352 tag="stage:rough-mix",
353 commit_ref=commit_id,
354 root=repo_root,
355 session=async_session,
356 )
357 await async_session.flush()
358
359 pairs = await _tag_search_async(
360 tag="emotion:melancholic", root=repo_root, session=async_session
361 )
362 assert pairs == [(commit_id, "emotion:melancholic")]
363
364
365 @pytest.mark.anyio
366 async def test_tag_search_prefix_match(
367 async_session: AsyncSession,
368 repo_root: pathlib.Path,
369 repo_id: str,
370 write_repo_json: None,
371 ) -> None:
372 """tag search with a namespace prefix (trailing colon) finds all matching tags."""
373 commit_id = await _insert_commit(async_session, repo_id, repo_root)
374
375 for t in ["emotion:melancholic", "emotion:hopeful", "stage:rough-mix"]:
376 await _tag_add_async(
377 tag=t,
378 commit_ref=commit_id,
379 root=repo_root,
380 session=async_session,
381 )
382 await async_session.flush()
383
384 pairs = await _tag_search_async(tag="emotion:", root=repo_root, session=async_session)
385 found_tags = {tag for _, tag in pairs}
386 assert found_tags == {"emotion:melancholic", "emotion:hopeful"}
387
388
389 @pytest.mark.anyio
390 async def test_tag_search_no_results(
391 async_session: AsyncSession,
392 repo_root: pathlib.Path,
393 repo_id: str,
394 write_repo_json: None,
395 ) -> None:
396 """tag search returns empty list when no commits carry the requested tag."""
397 await _insert_commit(async_session, repo_id, repo_root)
398
399 pairs = await _tag_search_async(tag="ref:nobody", root=repo_root, session=async_session)
400 assert pairs == []
401
402
403 @pytest.mark.anyio
404 async def test_tag_search_scoped_to_repo(
405 async_session: AsyncSession,
406 repo_root: pathlib.Path,
407 write_repo_json: None,
408 ) -> None:
409 """Tags from a different repo are not returned by search."""
410 # Tag under first repo (already set up via write_repo_json)
411 repo_id_1: str = json.loads((repo_root / ".muse" / "repo.json").read_text())["repo_id"]
412 commit_id_1 = await _insert_commit(async_session, repo_id_1, repo_root)
413 await _tag_add_async(
414 tag="stage:master",
415 commit_ref=commit_id_1,
416 root=repo_root,
417 session=async_session,
418 )
419 await async_session.flush()
420
421 # Switch repo.json to a different repo_id, insert a second commit
422 repo_id_2 = str(uuid.uuid4())
423 (repo_root / ".muse" / "repo.json").write_text(json.dumps({"repo_id": repo_id_2}))
424
425 snapshot_id_2 = "e" * 64
426 commit_id_2 = "f" * 64
427 async_session.add(MuseCliObject(object_id="g" * 64, size_bytes=1))
428 async_session.add(MuseCliSnapshot(snapshot_id=snapshot_id_2, manifest={"x.mid": "g" * 64}))
429 await async_session.flush()
430 async_session.add(
431 MuseCliCommit(
432 commit_id=commit_id_2,
433 repo_id=repo_id_2,
434 branch="main",
435 parent_commit_id=None,
436 parent2_commit_id=None,
437 snapshot_id=snapshot_id_2,
438 message="second repo initial",
439 author="",
440 committed_at=datetime.datetime.now(datetime.timezone.utc),
441 )
442 )
443 await async_session.flush()
444
445 # Search from second repo — should NOT find the tag belonging to first repo
446 pairs = await _tag_search_async(tag="stage:master", root=repo_root, session=async_session)
447 assert pairs == []
448
449
450 # ---------------------------------------------------------------------------
451 # Boundary seal
452 # ---------------------------------------------------------------------------
453
454
455 def test_tag_module_future_annotations_present() -> None:
456 """tag.py must start with 'from __future__ import annotations'."""
457 import maestro.muse_cli.commands.tag as tag_module
458
459 assert tag_module.__file__ is not None
460 source_path = pathlib.Path(tag_module.__file__)
461 tree = ast.parse(source_path.read_text())
462 first_import = next(
463 (
464 node
465 for node in ast.walk(tree)
466 if isinstance(node, ast.ImportFrom)
467 and node.module == "__future__"
468 ),
469 None,
470 )
471 assert first_import is not None, "Missing 'from __future__ import annotations'"
472 names = [alias.name for alias in first_import.names]
473 assert "annotations" in names