cgcardona / muse public
test_muse_e2e_harness.py python
410 lines 16.4 KB
12901c5a Initial extraction from tellurstori/maestro cgcardona <gabriel@tellurstori.com> 4d ago
1 """Muse E2E Tour de Force — full VCS lifecycle through real HTTP + real DB.
2
3 Exercises every Muse primitive in a single deterministic scenario:
4 commit → branch → merge → conflict → checkout (time travel)
5
6 Produces:
7 1. MuseLogGraph JSON (pretty-printed)
8 2. ASCII graph visualization (``git log --graph --oneline``)
9 3. Summary table (commits, merges, checkouts, conflicts, drift blocks)
10
11 Run:
12 docker compose exec maestro pytest tests/e2e/test_muse_e2e_harness.py -v -s
13 """
14
15 from __future__ import annotations
16
17 from sqlalchemy.ext.asyncio import AsyncSession
18 import json
19 import logging
20 import pytest
21 import pytest_asyncio
22 from httpx import AsyncClient
23
24 from tests.e2e.muse_fixtures import (
25 C0, C1, C2, C3, C5, C6,
26 CONVO_ID, PROJECT_ID,
27 MuseVariationPayload,
28 cc_sustain_branch_a,
29 cc_sustain_branch_b,
30 make_variation_payload,
31 snapshot_bass_v1,
32 snapshot_drums_v1,
33 snapshot_empty,
34 snapshot_keys_v1,
35 snapshot_keys_v2_with_cc,
36 snapshot_keys_v3_conflict,
37 )
38
39 logger = logging.getLogger(__name__)
40
41 BASE = "/api/v1/muse"
42
43
44 # ── Response-narrowing helpers ─────────────────────────────────────────────
45 # JSON responses are dict[str, object]. These helpers narrow values to their
46 # expected types and assert the API contract simultaneously.
47
48 def _s(d: dict[str, object], key: str) -> str:
49 """Extract a required string field from a response dict."""
50 v = d[key]
51 assert isinstance(v, str), f"Expected str for {key!r}, got {type(v).__name__}: {v!r}"
52 return v
53
54
55 def _s_opt(d: dict[str, object], key: str) -> str | None:
56 """Extract an optional string field (None allowed) from a response dict."""
57 v = d.get(key)
58 assert v is None or isinstance(v, str), f"Expected str|None for {key!r}, got {type(v).__name__}"
59 return v if isinstance(v, str) else None
60
61
62 def _d(d: dict[str, object], key: str) -> dict[str, object]:
63 """Extract a required dict field from a response dict."""
64 v = d[key]
65 assert isinstance(v, dict), f"Expected dict for {key!r}, got {type(v).__name__}"
66 return v
67
68
69 def _nodes(d: dict[str, object]) -> list[dict[str, object]]:
70 """Extract and validate the 'nodes' list from a log response."""
71 raw = d["nodes"]
72 assert isinstance(raw, list), f"Expected list for 'nodes', got {type(raw).__name__}"
73 result: list[dict[str, object]] = []
74 for item in raw:
75 assert isinstance(item, dict), f"Expected dict node, got {type(item).__name__}"
76 result.append(item)
77 return result
78
79 # ── Counters for summary table ────────────────────────────────────────────
80
81 _checkouts_executed = 0
82 _drift_blocks = 0
83 _conflict_merges = 0
84 _forced_ops = 0
85
86
87 # ── Helpers ───────────────────────────────────────────────────────────────
88
89
90 async def save(client: AsyncClient, payload: MuseVariationPayload, headers: dict[str, str]) -> dict[str, object]:
91 resp = await client.post(f"{BASE}/variations", json=payload, headers=headers)
92 assert resp.status_code == 200, f"save failed: {resp.text}"
93 result: dict[str, object] = resp.json()
94 return result
95
96
97 async def set_head(client: AsyncClient, vid: str, headers: dict[str, str]) -> dict[str, object]:
98 resp = await client.post(f"{BASE}/head", json={"variation_id": vid}, headers=headers)
99 assert resp.status_code == 200, f"set_head failed: {resp.text}"
100 result: dict[str, object] = resp.json()
101 return result
102
103
104 async def get_log(client: AsyncClient, headers: dict[str, str]) -> dict[str, object]:
105 resp = await client.get(f"{BASE}/log", params={"project_id": PROJECT_ID}, headers=headers)
106 assert resp.status_code == 200, f"get_log failed: {resp.text}"
107 result: dict[str, object] = resp.json()
108 return result
109
110
111 async def do_checkout(
112 client: AsyncClient, target: str, headers: dict[str, str], *, force: bool = False,
113 ) -> dict[str, object]:
114 global _checkouts_executed, _forced_ops
115 resp = await client.post(f"{BASE}/checkout", json={
116 "project_id": PROJECT_ID,
117 "target_variation_id": target,
118 "conversation_id": CONVO_ID,
119 "force": force,
120 }, headers=headers)
121 if resp.status_code == 409:
122 global _drift_blocks
123 _drift_blocks += 1
124 result: dict[str, object] = resp.json()
125 return result
126 assert resp.status_code == 200, f"checkout failed: {resp.text}"
127 _checkouts_executed += 1
128 if force:
129 _forced_ops += 1
130 result = resp.json()
131 return result
132
133
134 async def do_merge(
135 client: AsyncClient, left: str, right: str, headers: dict[str, str], *, force: bool = False,
136 ) -> tuple[int, dict[str, object]]:
137 global _forced_ops
138 resp = await client.post(f"{BASE}/merge", json={
139 "project_id": PROJECT_ID,
140 "left_id": left,
141 "right_id": right,
142 "conversation_id": CONVO_ID,
143 "force": force,
144 }, headers=headers)
145 if force:
146 _forced_ops += 1
147 body: dict[str, object] = resp.json()
148 return resp.status_code, body
149
150
151 # ── The Test ──────────────────────────────────────────────────────────────
152
153
154 @pytest.mark.anyio
155 async def test_muse_e2e_full_lifecycle(client: AsyncClient, auth_headers: dict[str, str], db_session: AsyncSession) -> None:
156
157 """Full Muse VCS lifecycle: commit → branch → merge → conflict → checkout."""
158 global _checkouts_executed, _drift_blocks, _conflict_merges, _forced_ops
159 _checkouts_executed = 0
160 _drift_blocks = 0
161 _conflict_merges = 0
162 _forced_ops = 0
163
164 headers = auth_headers
165
166 # ── Step 0: Initialize ────────────────────────────────────────────
167 print("\n═══ Step 0: Initialize ═══")
168 await save(client, make_variation_payload(
169 C0, "root", snapshot_empty(), snapshot_empty(),
170 ), headers)
171 await set_head(client, C0, headers)
172
173 log = await get_log(client, headers)
174 assert len(_nodes(log)) == 1
175 assert _s(log, "head") == C0
176 print(f" ✅ Root C0 committed, HEAD={C0[:8]}")
177
178 # ── Step 1: Mainline commit C1 (keys v1) ──────────────────────────
179 print("\n═══ Step 1: Mainline commit C1 (keys v1) ═══")
180 await save(client, make_variation_payload(
181 C1, "keys v1", snapshot_empty(), snapshot_keys_v1(),
182 parent_variation_id=C0,
183 ), headers)
184 await set_head(client, C1, headers)
185
186 co = await do_checkout(client, C1, headers, force=True)
187 assert co["head_moved"]
188 print(f" ✅ C1 committed + checked out, executed={_d(co, 'execution')['executed']} tool calls")
189
190 # ── Step 2: Branch A — bass (C2) ─────────────────────────────────
191 print("\n═══ Step 2: Branch A — bass v1 (C2) ═══")
192 await save(client, make_variation_payload(
193 C2, "bass v1", snapshot_empty(), snapshot_bass_v1(),
194 parent_variation_id=C1,
195 ), headers)
196 await set_head(client, C2, headers)
197 co = await do_checkout(client, C2, headers, force=True)
198 assert co["head_moved"]
199
200 log = await get_log(client, headers)
201 nodes = _nodes(log)
202 node_ids = [_s(n, "id") for n in nodes]
203 assert C1 in node_ids and C2 in node_ids
204 assert _s(log, "head") == C2
205 print(f" ✅ C2 committed, HEAD={C2[:8]}, graph has {len(nodes)} nodes")
206
207 # ── Step 3: Branch B — drums (C3) ────────────────────────────────
208 print("\n═══ Step 3: Branch B — drums v1 (C3) ═══")
209 # Checkout back to C1 first (time travel!)
210 co = await do_checkout(client, C1, headers, force=True)
211 assert co["head_moved"]
212
213 await save(client, make_variation_payload(
214 C3, "drums v1", snapshot_empty(), snapshot_drums_v1(),
215 parent_variation_id=C1,
216 ), headers)
217 await set_head(client, C3, headers)
218 co = await do_checkout(client, C3, headers, force=True)
219 assert co["head_moved"]
220 print(f" ✅ C3 committed, HEAD={C3[:8]}")
221
222 # ── Step 4: Merge branches (C4 = merge commit) ───────────────────
223 print("\n═══ Step 4: Merge C2 + C3 ═══")
224 status, merge_resp = await do_merge(client, C2, C3, headers, force=True)
225 assert status == 200, f"Merge failed: {merge_resp}"
226 assert merge_resp["head_moved"]
227 c4_id = _s(merge_resp, "merge_variation_id")
228 print(f" ✅ Merge commit C4={c4_id[:8]}, executed={_d(merge_resp, 'execution')['executed']} tool calls")
229
230 log = await get_log(client, headers)
231 assert _s(log, "head") == c4_id
232 c4_node = next(n for n in _nodes(log) if _s(n, "id") == c4_id)
233 assert c4_node["parent2"] is not None, "Merge commit must have two parents"
234 print(f" ✅ Merge commit has parent={_s(c4_node, 'parent')[:8]}, parent2={_s(c4_node, 'parent2')[:8]}")
235
236 # ── Step 5: Conflict merge demo ──────────────────────────────────
237 print("\n═══ Step 5: Conflict merge demo (C5 vs C6) ═══")
238 # C5: branch from C1, adds note + CC in r_keys
239 await save(client, make_variation_payload(
240 C5, "keys v2 (branch A)", snapshot_keys_v1(), snapshot_keys_v2_with_cc(),
241 parent_variation_id=C1,
242 cc_events=cc_sustain_branch_a(),
243 ), headers)
244 # C6: branch from C1, adds different note + different CC in r_keys
245 await save(client, make_variation_payload(
246 C6, "keys v3 (branch B)", snapshot_keys_v1(), snapshot_keys_v3_conflict(),
247 parent_variation_id=C1,
248 cc_events=cc_sustain_branch_b(),
249 ), headers)
250
251 status, conflict_resp = await do_merge(client, C5, C6, headers)
252 _conflict_merges += 1
253 assert status == 409, f"Expected 409 conflict, got {status}: {conflict_resp}"
254 detail = _d(conflict_resp, "detail")
255 assert detail["error"] == "merge_conflict"
256 _conflicts_raw = detail["conflicts"]
257 assert isinstance(_conflicts_raw, list)
258 conflicts: list[dict[str, object]] = [c for c in _conflicts_raw if isinstance(c, dict)]
259 assert len(conflicts) >= 1, "Expected at least one conflict"
260 print(f" ✅ Conflict detected: {len(conflicts)} conflict(s)")
261 for c in conflicts:
262 print(f" {_s(c, 'type')}: {_s(c, 'description')}")
263
264 # ── Step 6: (Skipped — cherry-pick not yet implemented) ──────────
265 print("\n═══ Step 6: Cherry-pick — skipped (future phase) ═══")
266
267 # ── Step 7: Checkout traversal demo ──────────────────────────────
268 print("\n═══ Step 7: Checkout traversal ═══")
269 plan_hashes: list[str] = []
270
271 co = await do_checkout(client, C1, headers, force=True)
272 assert co["head_moved"]
273 plan_hashes.append(_s(_d(co, "execution"), "plan_hash"))
274 print(f" → Checked out C1: executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}")
275
276 co = await do_checkout(client, C2, headers, force=True)
277 assert co["head_moved"]
278 plan_hashes.append(_s(_d(co, "execution"), "plan_hash"))
279 print(f" → Checked out C2: executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}")
280
281 co = await do_checkout(client, c4_id, headers, force=True)
282 assert co["head_moved"]
283 plan_hashes.append(_s(_d(co, "execution"), "plan_hash"))
284 print(f" → Checked out C4 (merge): executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}")
285
286 # Checkout to same target again — should be no-op or same hash
287 co2 = await do_checkout(client, c4_id, headers, force=True)
288 assert co2["head_moved"]
289 print(f" → Re-checkout C4: executed={_d(co2, 'execution')['executed']}, hash={_s(_d(co2, 'execution'), 'plan_hash')[:12]}")
290 print(f" ✅ All checkouts transactional, plan hashes: {[h[:12] for h in plan_hashes]}")
291
292 # ── Final assertions ─────────────────────────────────────────────
293 print("\n═══ Final Assertions ═══")
294
295 log = await get_log(client, headers)
296 log_nodes = _nodes(log)
297
298 # DAG correctness
299 node_map = {_s(n, "id"): n for n in log_nodes}
300 assert node_map[C0]["parent"] is None
301 assert node_map[C1]["parent"] == C0
302 assert node_map[C2]["parent"] == C1
303 assert node_map[C3]["parent"] == C1
304 assert node_map[C5]["parent"] == C1
305 assert node_map[C6]["parent"] == C1
306 print(" ✅ DAG parent relationships correct")
307
308 # Merge commit has two parents
309 assert c4_id in node_map
310 assert node_map[c4_id]["parent"] is not None
311 assert node_map[c4_id]["parent2"] is not None
312 print(" ✅ Merge commit has 2 parents")
313
314 # HEAD correctness
315 assert _s(log, "head") == c4_id
316 print(f" ✅ HEAD = {c4_id[:8]}")
317
318 # Topological order: parents before children
319 id_order = [_s(n, "id") for n in log_nodes]
320 for n in log_nodes:
321 n_id = _s(n, "id")
322 n_parent = _s_opt(n, "parent")
323 n_parent2 = _s_opt(n, "parent2")
324 if n_parent and n_parent in node_map:
325 assert id_order.index(n_parent) < id_order.index(n_id), \
326 f"Parent {n_parent[:8]} must appear before child {n_id[:8]}"
327 if n_parent2 and n_parent2 in node_map:
328 assert id_order.index(n_parent2) < id_order.index(n_id), \
329 f"Parent2 {n_parent2[:8]} must appear before child {n_id[:8]}"
330 print(" ✅ Topological ordering: parents before children")
331
332 # camelCase serialization
333 for n in log_nodes:
334 assert "isHead" in n
335 assert "parent2" in n
336 assert "projectId" in log
337 print(" ✅ Serialization is camelCase and stable")
338
339 # Conflict merge returned conflicts deterministically
340 assert len(conflicts) >= 1
341 assert all("region_id" in c and "type" in c and "description" in c for c in conflicts)
342 print(" ✅ Conflict payloads deterministic")
343
344 # ── Render output ────────────────────────────────────────────────
345 print("\n" + "═" * 60)
346 print(" MUSE LOG GRAPH — ASCII")
347 print("═" * 60)
348
349 from maestro.services.muse_log_render import render_ascii_graph, render_json, render_summary_table
350 from maestro.services.muse_log_graph import MuseLogGraph, MuseLogNode
351
352 # Reconstruct MuseLogGraph from the JSON for rendering
353 import time
354
355 def _str_opt(v: object) -> str | None:
356 return v if isinstance(v, str) else None
357
358 def _float_ts(v: object) -> float:
359 """Parse a timestamp value — ISO string or numeric — to a float epoch."""
360 if isinstance(v, (int, float)):
361 return float(v)
362 if isinstance(v, str):
363 from datetime import datetime, timezone
364 try:
365 return datetime.fromisoformat(v).replace(tzinfo=timezone.utc).timestamp()
366 except ValueError:
367 pass
368 return time.time()
369
370 graph = MuseLogGraph(
371 project_id=_s(log, "projectId"),
372 head=_s_opt(log, "head"),
373 nodes=tuple(
374 MuseLogNode(
375 variation_id=_s(n, "id"),
376 parent=_str_opt(n.get("parent")),
377 parent2=_str_opt(n.get("parent2")),
378 is_head=bool(n.get("isHead")),
379 timestamp=_float_ts(n.get("timestamp")),
380 intent=_str_opt(n.get("intent")),
381 affected_regions=tuple(
382 r
383 for _rgns in [n.get("regions")]
384 if isinstance(_rgns, list)
385 for r in _rgns
386 if isinstance(r, str)
387 ),
388 )
389 for n in log_nodes
390 ),
391 )
392
393 print(render_ascii_graph(graph))
394
395 print("\n" + "═" * 60)
396 print(" MUSE LOG GRAPH — JSON")
397 print("═" * 60)
398 print(render_json(graph))
399
400 print("\n" + "═" * 60)
401 print(" SUMMARY")
402 print("═" * 60)
403 print(render_summary_table(
404 graph,
405 checkouts_executed=_checkouts_executed,
406 drift_blocks=_drift_blocks,
407 conflict_merges=_conflict_merges,
408 forced_ops=_forced_ops,
409 ))
410 print()