test_muse_e2e_harness.py
python
| 1 | """Muse E2E Tour de Force — full VCS lifecycle through real HTTP + real DB. |
| 2 | |
| 3 | Exercises every Muse primitive in a single deterministic scenario: |
| 4 | commit → branch → merge → conflict → checkout (time travel) |
| 5 | |
| 6 | Produces: |
| 7 | 1. MuseLogGraph JSON (pretty-printed) |
| 8 | 2. ASCII graph visualization (``git log --graph --oneline``) |
| 9 | 3. Summary table (commits, merges, checkouts, conflicts, drift blocks) |
| 10 | |
| 11 | Run: |
| 12 | docker compose exec maestro pytest tests/e2e/test_muse_e2e_harness.py -v -s |
| 13 | """ |
| 14 | |
| 15 | from __future__ import annotations |
| 16 | |
| 17 | from sqlalchemy.ext.asyncio import AsyncSession |
| 18 | import json |
| 19 | import logging |
| 20 | import pytest |
| 21 | import pytest_asyncio |
| 22 | from httpx import AsyncClient |
| 23 | |
| 24 | from tests.e2e.muse_fixtures import ( |
| 25 | C0, C1, C2, C3, C5, C6, |
| 26 | CONVO_ID, PROJECT_ID, |
| 27 | MuseVariationPayload, |
| 28 | cc_sustain_branch_a, |
| 29 | cc_sustain_branch_b, |
| 30 | make_variation_payload, |
| 31 | snapshot_bass_v1, |
| 32 | snapshot_drums_v1, |
| 33 | snapshot_empty, |
| 34 | snapshot_keys_v1, |
| 35 | snapshot_keys_v2_with_cc, |
| 36 | snapshot_keys_v3_conflict, |
| 37 | ) |
| 38 | |
| 39 | logger = logging.getLogger(__name__) |
| 40 | |
| 41 | BASE = "/api/v1/muse" |
| 42 | |
| 43 | |
| 44 | # ── Response-narrowing helpers ───────────────────────────────────────────── |
| 45 | # JSON responses are dict[str, object]. These helpers narrow values to their |
| 46 | # expected types and assert the API contract simultaneously. |
| 47 | |
| 48 | def _s(d: dict[str, object], key: str) -> str: |
| 49 | """Extract a required string field from a response dict.""" |
| 50 | v = d[key] |
| 51 | assert isinstance(v, str), f"Expected str for {key!r}, got {type(v).__name__}: {v!r}" |
| 52 | return v |
| 53 | |
| 54 | |
| 55 | def _s_opt(d: dict[str, object], key: str) -> str | None: |
| 56 | """Extract an optional string field (None allowed) from a response dict.""" |
| 57 | v = d.get(key) |
| 58 | assert v is None or isinstance(v, str), f"Expected str|None for {key!r}, got {type(v).__name__}" |
| 59 | return v if isinstance(v, str) else None |
| 60 | |
| 61 | |
| 62 | def _d(d: dict[str, object], key: str) -> dict[str, object]: |
| 63 | """Extract a required dict field from a response dict.""" |
| 64 | v = d[key] |
| 65 | assert isinstance(v, dict), f"Expected dict for {key!r}, got {type(v).__name__}" |
| 66 | return v |
| 67 | |
| 68 | |
| 69 | def _nodes(d: dict[str, object]) -> list[dict[str, object]]: |
| 70 | """Extract and validate the 'nodes' list from a log response.""" |
| 71 | raw = d["nodes"] |
| 72 | assert isinstance(raw, list), f"Expected list for 'nodes', got {type(raw).__name__}" |
| 73 | result: list[dict[str, object]] = [] |
| 74 | for item in raw: |
| 75 | assert isinstance(item, dict), f"Expected dict node, got {type(item).__name__}" |
| 76 | result.append(item) |
| 77 | return result |
| 78 | |
| 79 | # ── Counters for summary table ──────────────────────────────────────────── |
| 80 | |
| 81 | _checkouts_executed = 0 |
| 82 | _drift_blocks = 0 |
| 83 | _conflict_merges = 0 |
| 84 | _forced_ops = 0 |
| 85 | |
| 86 | |
| 87 | # ── Helpers ─────────────────────────────────────────────────────────────── |
| 88 | |
| 89 | |
| 90 | async def save(client: AsyncClient, payload: MuseVariationPayload, headers: dict[str, str]) -> dict[str, object]: |
| 91 | resp = await client.post(f"{BASE}/variations", json=payload, headers=headers) |
| 92 | assert resp.status_code == 200, f"save failed: {resp.text}" |
| 93 | result: dict[str, object] = resp.json() |
| 94 | return result |
| 95 | |
| 96 | |
| 97 | async def set_head(client: AsyncClient, vid: str, headers: dict[str, str]) -> dict[str, object]: |
| 98 | resp = await client.post(f"{BASE}/head", json={"variation_id": vid}, headers=headers) |
| 99 | assert resp.status_code == 200, f"set_head failed: {resp.text}" |
| 100 | result: dict[str, object] = resp.json() |
| 101 | return result |
| 102 | |
| 103 | |
| 104 | async def get_log(client: AsyncClient, headers: dict[str, str]) -> dict[str, object]: |
| 105 | resp = await client.get(f"{BASE}/log", params={"project_id": PROJECT_ID}, headers=headers) |
| 106 | assert resp.status_code == 200, f"get_log failed: {resp.text}" |
| 107 | result: dict[str, object] = resp.json() |
| 108 | return result |
| 109 | |
| 110 | |
| 111 | async def do_checkout( |
| 112 | client: AsyncClient, target: str, headers: dict[str, str], *, force: bool = False, |
| 113 | ) -> dict[str, object]: |
| 114 | global _checkouts_executed, _forced_ops |
| 115 | resp = await client.post(f"{BASE}/checkout", json={ |
| 116 | "project_id": PROJECT_ID, |
| 117 | "target_variation_id": target, |
| 118 | "conversation_id": CONVO_ID, |
| 119 | "force": force, |
| 120 | }, headers=headers) |
| 121 | if resp.status_code == 409: |
| 122 | global _drift_blocks |
| 123 | _drift_blocks += 1 |
| 124 | result: dict[str, object] = resp.json() |
| 125 | return result |
| 126 | assert resp.status_code == 200, f"checkout failed: {resp.text}" |
| 127 | _checkouts_executed += 1 |
| 128 | if force: |
| 129 | _forced_ops += 1 |
| 130 | result = resp.json() |
| 131 | return result |
| 132 | |
| 133 | |
| 134 | async def do_merge( |
| 135 | client: AsyncClient, left: str, right: str, headers: dict[str, str], *, force: bool = False, |
| 136 | ) -> tuple[int, dict[str, object]]: |
| 137 | global _forced_ops |
| 138 | resp = await client.post(f"{BASE}/merge", json={ |
| 139 | "project_id": PROJECT_ID, |
| 140 | "left_id": left, |
| 141 | "right_id": right, |
| 142 | "conversation_id": CONVO_ID, |
| 143 | "force": force, |
| 144 | }, headers=headers) |
| 145 | if force: |
| 146 | _forced_ops += 1 |
| 147 | body: dict[str, object] = resp.json() |
| 148 | return resp.status_code, body |
| 149 | |
| 150 | |
| 151 | # ── The Test ────────────────────────────────────────────────────────────── |
| 152 | |
| 153 | |
| 154 | @pytest.mark.anyio |
| 155 | async def test_muse_e2e_full_lifecycle(client: AsyncClient, auth_headers: dict[str, str], db_session: AsyncSession) -> None: |
| 156 | |
| 157 | """Full Muse VCS lifecycle: commit → branch → merge → conflict → checkout.""" |
| 158 | global _checkouts_executed, _drift_blocks, _conflict_merges, _forced_ops |
| 159 | _checkouts_executed = 0 |
| 160 | _drift_blocks = 0 |
| 161 | _conflict_merges = 0 |
| 162 | _forced_ops = 0 |
| 163 | |
| 164 | headers = auth_headers |
| 165 | |
| 166 | # ── Step 0: Initialize ──────────────────────────────────────────── |
| 167 | print("\n═══ Step 0: Initialize ═══") |
| 168 | await save(client, make_variation_payload( |
| 169 | C0, "root", snapshot_empty(), snapshot_empty(), |
| 170 | ), headers) |
| 171 | await set_head(client, C0, headers) |
| 172 | |
| 173 | log = await get_log(client, headers) |
| 174 | assert len(_nodes(log)) == 1 |
| 175 | assert _s(log, "head") == C0 |
| 176 | print(f" ✅ Root C0 committed, HEAD={C0[:8]}") |
| 177 | |
| 178 | # ── Step 1: Mainline commit C1 (keys v1) ────────────────────────── |
| 179 | print("\n═══ Step 1: Mainline commit C1 (keys v1) ═══") |
| 180 | await save(client, make_variation_payload( |
| 181 | C1, "keys v1", snapshot_empty(), snapshot_keys_v1(), |
| 182 | parent_variation_id=C0, |
| 183 | ), headers) |
| 184 | await set_head(client, C1, headers) |
| 185 | |
| 186 | co = await do_checkout(client, C1, headers, force=True) |
| 187 | assert co["head_moved"] |
| 188 | print(f" ✅ C1 committed + checked out, executed={_d(co, 'execution')['executed']} tool calls") |
| 189 | |
| 190 | # ── Step 2: Branch A — bass (C2) ───────────────────────────────── |
| 191 | print("\n═══ Step 2: Branch A — bass v1 (C2) ═══") |
| 192 | await save(client, make_variation_payload( |
| 193 | C2, "bass v1", snapshot_empty(), snapshot_bass_v1(), |
| 194 | parent_variation_id=C1, |
| 195 | ), headers) |
| 196 | await set_head(client, C2, headers) |
| 197 | co = await do_checkout(client, C2, headers, force=True) |
| 198 | assert co["head_moved"] |
| 199 | |
| 200 | log = await get_log(client, headers) |
| 201 | nodes = _nodes(log) |
| 202 | node_ids = [_s(n, "id") for n in nodes] |
| 203 | assert C1 in node_ids and C2 in node_ids |
| 204 | assert _s(log, "head") == C2 |
| 205 | print(f" ✅ C2 committed, HEAD={C2[:8]}, graph has {len(nodes)} nodes") |
| 206 | |
| 207 | # ── Step 3: Branch B — drums (C3) ──────────────────────────────── |
| 208 | print("\n═══ Step 3: Branch B — drums v1 (C3) ═══") |
| 209 | # Checkout back to C1 first (time travel!) |
| 210 | co = await do_checkout(client, C1, headers, force=True) |
| 211 | assert co["head_moved"] |
| 212 | |
| 213 | await save(client, make_variation_payload( |
| 214 | C3, "drums v1", snapshot_empty(), snapshot_drums_v1(), |
| 215 | parent_variation_id=C1, |
| 216 | ), headers) |
| 217 | await set_head(client, C3, headers) |
| 218 | co = await do_checkout(client, C3, headers, force=True) |
| 219 | assert co["head_moved"] |
| 220 | print(f" ✅ C3 committed, HEAD={C3[:8]}") |
| 221 | |
| 222 | # ── Step 4: Merge branches (C4 = merge commit) ─────────────────── |
| 223 | print("\n═══ Step 4: Merge C2 + C3 ═══") |
| 224 | status, merge_resp = await do_merge(client, C2, C3, headers, force=True) |
| 225 | assert status == 200, f"Merge failed: {merge_resp}" |
| 226 | assert merge_resp["head_moved"] |
| 227 | c4_id = _s(merge_resp, "merge_variation_id") |
| 228 | print(f" ✅ Merge commit C4={c4_id[:8]}, executed={_d(merge_resp, 'execution')['executed']} tool calls") |
| 229 | |
| 230 | log = await get_log(client, headers) |
| 231 | assert _s(log, "head") == c4_id |
| 232 | c4_node = next(n for n in _nodes(log) if _s(n, "id") == c4_id) |
| 233 | assert c4_node["parent2"] is not None, "Merge commit must have two parents" |
| 234 | print(f" ✅ Merge commit has parent={_s(c4_node, 'parent')[:8]}, parent2={_s(c4_node, 'parent2')[:8]}") |
| 235 | |
| 236 | # ── Step 5: Conflict merge demo ────────────────────────────────── |
| 237 | print("\n═══ Step 5: Conflict merge demo (C5 vs C6) ═══") |
| 238 | # C5: branch from C1, adds note + CC in r_keys |
| 239 | await save(client, make_variation_payload( |
| 240 | C5, "keys v2 (branch A)", snapshot_keys_v1(), snapshot_keys_v2_with_cc(), |
| 241 | parent_variation_id=C1, |
| 242 | cc_events=cc_sustain_branch_a(), |
| 243 | ), headers) |
| 244 | # C6: branch from C1, adds different note + different CC in r_keys |
| 245 | await save(client, make_variation_payload( |
| 246 | C6, "keys v3 (branch B)", snapshot_keys_v1(), snapshot_keys_v3_conflict(), |
| 247 | parent_variation_id=C1, |
| 248 | cc_events=cc_sustain_branch_b(), |
| 249 | ), headers) |
| 250 | |
| 251 | status, conflict_resp = await do_merge(client, C5, C6, headers) |
| 252 | _conflict_merges += 1 |
| 253 | assert status == 409, f"Expected 409 conflict, got {status}: {conflict_resp}" |
| 254 | detail = _d(conflict_resp, "detail") |
| 255 | assert detail["error"] == "merge_conflict" |
| 256 | _conflicts_raw = detail["conflicts"] |
| 257 | assert isinstance(_conflicts_raw, list) |
| 258 | conflicts: list[dict[str, object]] = [c for c in _conflicts_raw if isinstance(c, dict)] |
| 259 | assert len(conflicts) >= 1, "Expected at least one conflict" |
| 260 | print(f" ✅ Conflict detected: {len(conflicts)} conflict(s)") |
| 261 | for c in conflicts: |
| 262 | print(f" {_s(c, 'type')}: {_s(c, 'description')}") |
| 263 | |
| 264 | # ── Step 6: (Skipped — cherry-pick not yet implemented) ────────── |
| 265 | print("\n═══ Step 6: Cherry-pick — skipped (future phase) ═══") |
| 266 | |
| 267 | # ── Step 7: Checkout traversal demo ────────────────────────────── |
| 268 | print("\n═══ Step 7: Checkout traversal ═══") |
| 269 | plan_hashes: list[str] = [] |
| 270 | |
| 271 | co = await do_checkout(client, C1, headers, force=True) |
| 272 | assert co["head_moved"] |
| 273 | plan_hashes.append(_s(_d(co, "execution"), "plan_hash")) |
| 274 | print(f" → Checked out C1: executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}") |
| 275 | |
| 276 | co = await do_checkout(client, C2, headers, force=True) |
| 277 | assert co["head_moved"] |
| 278 | plan_hashes.append(_s(_d(co, "execution"), "plan_hash")) |
| 279 | print(f" → Checked out C2: executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}") |
| 280 | |
| 281 | co = await do_checkout(client, c4_id, headers, force=True) |
| 282 | assert co["head_moved"] |
| 283 | plan_hashes.append(_s(_d(co, "execution"), "plan_hash")) |
| 284 | print(f" → Checked out C4 (merge): executed={_d(co, 'execution')['executed']}, hash={_s(_d(co, 'execution'), 'plan_hash')[:12]}") |
| 285 | |
| 286 | # Checkout to same target again — should be no-op or same hash |
| 287 | co2 = await do_checkout(client, c4_id, headers, force=True) |
| 288 | assert co2["head_moved"] |
| 289 | print(f" → Re-checkout C4: executed={_d(co2, 'execution')['executed']}, hash={_s(_d(co2, 'execution'), 'plan_hash')[:12]}") |
| 290 | print(f" ✅ All checkouts transactional, plan hashes: {[h[:12] for h in plan_hashes]}") |
| 291 | |
| 292 | # ── Final assertions ───────────────────────────────────────────── |
| 293 | print("\n═══ Final Assertions ═══") |
| 294 | |
| 295 | log = await get_log(client, headers) |
| 296 | log_nodes = _nodes(log) |
| 297 | |
| 298 | # DAG correctness |
| 299 | node_map = {_s(n, "id"): n for n in log_nodes} |
| 300 | assert node_map[C0]["parent"] is None |
| 301 | assert node_map[C1]["parent"] == C0 |
| 302 | assert node_map[C2]["parent"] == C1 |
| 303 | assert node_map[C3]["parent"] == C1 |
| 304 | assert node_map[C5]["parent"] == C1 |
| 305 | assert node_map[C6]["parent"] == C1 |
| 306 | print(" ✅ DAG parent relationships correct") |
| 307 | |
| 308 | # Merge commit has two parents |
| 309 | assert c4_id in node_map |
| 310 | assert node_map[c4_id]["parent"] is not None |
| 311 | assert node_map[c4_id]["parent2"] is not None |
| 312 | print(" ✅ Merge commit has 2 parents") |
| 313 | |
| 314 | # HEAD correctness |
| 315 | assert _s(log, "head") == c4_id |
| 316 | print(f" ✅ HEAD = {c4_id[:8]}") |
| 317 | |
| 318 | # Topological order: parents before children |
| 319 | id_order = [_s(n, "id") for n in log_nodes] |
| 320 | for n in log_nodes: |
| 321 | n_id = _s(n, "id") |
| 322 | n_parent = _s_opt(n, "parent") |
| 323 | n_parent2 = _s_opt(n, "parent2") |
| 324 | if n_parent and n_parent in node_map: |
| 325 | assert id_order.index(n_parent) < id_order.index(n_id), \ |
| 326 | f"Parent {n_parent[:8]} must appear before child {n_id[:8]}" |
| 327 | if n_parent2 and n_parent2 in node_map: |
| 328 | assert id_order.index(n_parent2) < id_order.index(n_id), \ |
| 329 | f"Parent2 {n_parent2[:8]} must appear before child {n_id[:8]}" |
| 330 | print(" ✅ Topological ordering: parents before children") |
| 331 | |
| 332 | # camelCase serialization |
| 333 | for n in log_nodes: |
| 334 | assert "isHead" in n |
| 335 | assert "parent2" in n |
| 336 | assert "projectId" in log |
| 337 | print(" ✅ Serialization is camelCase and stable") |
| 338 | |
| 339 | # Conflict merge returned conflicts deterministically |
| 340 | assert len(conflicts) >= 1 |
| 341 | assert all("region_id" in c and "type" in c and "description" in c for c in conflicts) |
| 342 | print(" ✅ Conflict payloads deterministic") |
| 343 | |
| 344 | # ── Render output ──────────────────────────────────────────────── |
| 345 | print("\n" + "═" * 60) |
| 346 | print(" MUSE LOG GRAPH — ASCII") |
| 347 | print("═" * 60) |
| 348 | |
| 349 | from maestro.services.muse_log_render import render_ascii_graph, render_json, render_summary_table |
| 350 | from maestro.services.muse_log_graph import MuseLogGraph, MuseLogNode |
| 351 | |
| 352 | # Reconstruct MuseLogGraph from the JSON for rendering |
| 353 | import time |
| 354 | |
| 355 | def _str_opt(v: object) -> str | None: |
| 356 | return v if isinstance(v, str) else None |
| 357 | |
| 358 | def _float_ts(v: object) -> float: |
| 359 | """Parse a timestamp value — ISO string or numeric — to a float epoch.""" |
| 360 | if isinstance(v, (int, float)): |
| 361 | return float(v) |
| 362 | if isinstance(v, str): |
| 363 | from datetime import datetime, timezone |
| 364 | try: |
| 365 | return datetime.fromisoformat(v).replace(tzinfo=timezone.utc).timestamp() |
| 366 | except ValueError: |
| 367 | pass |
| 368 | return time.time() |
| 369 | |
| 370 | graph = MuseLogGraph( |
| 371 | project_id=_s(log, "projectId"), |
| 372 | head=_s_opt(log, "head"), |
| 373 | nodes=tuple( |
| 374 | MuseLogNode( |
| 375 | variation_id=_s(n, "id"), |
| 376 | parent=_str_opt(n.get("parent")), |
| 377 | parent2=_str_opt(n.get("parent2")), |
| 378 | is_head=bool(n.get("isHead")), |
| 379 | timestamp=_float_ts(n.get("timestamp")), |
| 380 | intent=_str_opt(n.get("intent")), |
| 381 | affected_regions=tuple( |
| 382 | r |
| 383 | for _rgns in [n.get("regions")] |
| 384 | if isinstance(_rgns, list) |
| 385 | for r in _rgns |
| 386 | if isinstance(r, str) |
| 387 | ), |
| 388 | ) |
| 389 | for n in log_nodes |
| 390 | ), |
| 391 | ) |
| 392 | |
| 393 | print(render_ascii_graph(graph)) |
| 394 | |
| 395 | print("\n" + "═" * 60) |
| 396 | print(" MUSE LOG GRAPH — JSON") |
| 397 | print("═" * 60) |
| 398 | print(render_json(graph)) |
| 399 | |
| 400 | print("\n" + "═" * 60) |
| 401 | print(" SUMMARY") |
| 402 | print("═" * 60) |
| 403 | print(render_summary_table( |
| 404 | graph, |
| 405 | checkouts_executed=_checkouts_executed, |
| 406 | drift_blocks=_drift_blocks, |
| 407 | conflict_merges=_conflict_merges, |
| 408 | forced_ops=_forced_ops, |
| 409 | )) |
| 410 | print() |