muse_log_render.py
python
| 1 | """Muse Log Renderer — ``git log --graph`` style ASCII visualization. |
| 2 | |
| 3 | Takes a ``MuseLogGraph`` and produces: |
| 4 | 1. ASCII graph with branch/merge lines |
| 5 | 2. Pretty-printed JSON |
| 6 | 3. Summary table |
| 7 | |
| 8 | Pure rendering — no I/O, no DB, no mutations. |
| 9 | """ |
| 10 | |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import json |
| 14 | from collections import defaultdict |
| 15 | |
| 16 | from maestro.services.muse_log_graph import MuseLogGraph, MuseLogNode |
| 17 | |
| 18 | |
| 19 | def render_ascii_graph(graph: MuseLogGraph) -> str: |
| 20 | """Render a ``git log --graph --oneline`` style ASCII visualization. |
| 21 | |
| 22 | Processes nodes newest-first. Each active "column" tracks a |
| 23 | variation_id we expect to encounter next (following parent links). |
| 24 | Merges create forks; convergence collapses columns. |
| 25 | """ |
| 26 | if not graph.nodes: |
| 27 | return "(empty graph)" |
| 28 | |
| 29 | nodes = list(reversed(graph.nodes)) |
| 30 | lines: list[str] = [] |
| 31 | columns: list[str | None] = [] |
| 32 | |
| 33 | for node in nodes: |
| 34 | vid = node.variation_id |
| 35 | |
| 36 | col = _index_of(columns, vid) |
| 37 | if col is None: |
| 38 | col = len(columns) |
| 39 | columns.append(vid) |
| 40 | |
| 41 | n_cols = len(columns) |
| 42 | short = vid[:8] |
| 43 | head = " (HEAD)" if node.is_head else "" |
| 44 | intent = node.intent or "" |
| 45 | label = f"{short} {intent}{head}" |
| 46 | |
| 47 | is_merge = node.parent2 is not None |
| 48 | |
| 49 | # Draw the commit line |
| 50 | parts = _col_chars(columns, n_cols, active=col) |
| 51 | if is_merge: |
| 52 | lines.append(" ".join(parts) + f" {label}") |
| 53 | else: |
| 54 | lines.append(" ".join(parts) + f" {label}") |
| 55 | |
| 56 | # Handle parent wiring |
| 57 | if is_merge: |
| 58 | # Primary parent stays in this column |
| 59 | columns[col] = node.parent |
| 60 | |
| 61 | p2 = node.parent2 |
| 62 | p2_col = _index_of(columns, p2) |
| 63 | |
| 64 | if p2_col is not None and p2_col != col: |
| 65 | # Parent2 already tracked — draw convergence and collapse |
| 66 | lo, hi = min(col, p2_col), max(col, p2_col) |
| 67 | merge_parts: list[str] = [] |
| 68 | for i in range(n_cols): |
| 69 | if i == lo: |
| 70 | merge_parts.append("|") |
| 71 | elif i == hi: |
| 72 | merge_parts.append("/") |
| 73 | elif columns[i] is not None: |
| 74 | merge_parts.append("|") |
| 75 | else: |
| 76 | merge_parts.append(" ") |
| 77 | lines.append(" ".join(merge_parts)) |
| 78 | columns[hi] = None |
| 79 | else: |
| 80 | # Parent2 not yet tracked — open a new column |
| 81 | columns.append(p2) |
| 82 | fork_parts: list[str] = [] |
| 83 | for i in range(len(columns)): |
| 84 | if i == col: |
| 85 | fork_parts.append("|") |
| 86 | elif i == len(columns) - 1: |
| 87 | fork_parts.append("\\") |
| 88 | elif columns[i] is not None: |
| 89 | fork_parts.append("|") |
| 90 | else: |
| 91 | fork_parts.append(" ") |
| 92 | lines.append(" ".join(fork_parts)) |
| 93 | else: |
| 94 | # Simple linear commit — track parent |
| 95 | columns[col] = node.parent |
| 96 | |
| 97 | # Draw convergence lines when multiple columns point to the same parent, |
| 98 | # then collapse the duplicate columns. |
| 99 | _draw_and_collapse_duplicates(columns, lines) |
| 100 | |
| 101 | # Trim trailing dead columns |
| 102 | while columns and columns[-1] is None: |
| 103 | columns.pop() |
| 104 | |
| 105 | return "\n".join(lines) |
| 106 | |
| 107 | |
| 108 | def render_json(graph: MuseLogGraph) -> str: |
| 109 | """Pretty-print the MuseLogGraph JSON.""" |
| 110 | return json.dumps(graph.to_response().model_dump(), indent=2, default=str) |
| 111 | |
| 112 | |
| 113 | def render_summary_table( |
| 114 | graph: MuseLogGraph, |
| 115 | *, |
| 116 | checkouts_executed: int = 0, |
| 117 | drift_blocks: int = 0, |
| 118 | conflict_merges: int = 0, |
| 119 | forced_ops: int = 0, |
| 120 | ) -> str: |
| 121 | """Render a summary statistics table.""" |
| 122 | total_commits = len(graph.nodes) |
| 123 | merges = sum(1 for n in graph.nodes if n.parent2 is not None) |
| 124 | |
| 125 | child_set: set[str] = set() |
| 126 | for n in graph.nodes: |
| 127 | if n.parent: |
| 128 | child_set.add(n.parent) |
| 129 | if n.parent2: |
| 130 | child_set.add(n.parent2) |
| 131 | leaf_nodes = [n for n in graph.nodes if n.variation_id not in child_set] |
| 132 | branch_heads = len(leaf_nodes) |
| 133 | |
| 134 | rows = [ |
| 135 | ("Commits", str(total_commits)), |
| 136 | ("Merges", str(merges)), |
| 137 | ("Branch heads", str(branch_heads)), |
| 138 | ("Conflict merges attempted", str(conflict_merges)), |
| 139 | ("Checkouts executed", str(checkouts_executed)), |
| 140 | ("Drift blocks", str(drift_blocks)), |
| 141 | ("Forced operations", str(forced_ops)), |
| 142 | ] |
| 143 | max_label = max(len(r[0]) for r in rows) |
| 144 | lines = ["┌" + "─" * (max_label + 2) + "┬" + "──────┐"] |
| 145 | for label, value in rows: |
| 146 | lines.append(f"│ {label:<{max_label}} │ {value:>4} │") |
| 147 | lines.append("└" + "─" * (max_label + 2) + "┴" + "──────┘") |
| 148 | return "\n".join(lines) |
| 149 | |
| 150 | |
| 151 | # ── Private helpers ─────────────────────────────────────────────────────── |
| 152 | |
| 153 | |
| 154 | def _index_of(columns: list[str | None], vid: str | None) -> int | None: |
| 155 | if vid is None: |
| 156 | return None |
| 157 | for i, c in enumerate(columns): |
| 158 | if c == vid: |
| 159 | return i |
| 160 | return None |
| 161 | |
| 162 | |
| 163 | def _col_chars(columns: list[str | None], n: int, active: int) -> list[str]: |
| 164 | parts: list[str] = [] |
| 165 | for i in range(n): |
| 166 | if i == active: |
| 167 | parts.append("*") |
| 168 | elif columns[i] is not None: |
| 169 | parts.append("|") |
| 170 | else: |
| 171 | parts.append(" ") |
| 172 | return parts |
| 173 | |
| 174 | |
| 175 | def _draw_and_collapse_duplicates( |
| 176 | columns: list[str | None], lines: list[str], |
| 177 | ) -> None: |
| 178 | """When multiple columns track the same parent, draw ``/`` convergence |
| 179 | lines and collapse the rightmost duplicates.""" |
| 180 | changed = True |
| 181 | while changed: |
| 182 | changed = False |
| 183 | seen: dict[str, int] = {} |
| 184 | for i, c in enumerate(columns): |
| 185 | if c is None: |
| 186 | continue |
| 187 | if c in seen: |
| 188 | keep, remove = seen[c], i |
| 189 | lo, hi = min(keep, remove), max(keep, remove) |
| 190 | n = len(columns) |
| 191 | parts: list[str] = [] |
| 192 | for j in range(n): |
| 193 | if j == lo: |
| 194 | parts.append("|") |
| 195 | elif j == hi: |
| 196 | parts.append("/") |
| 197 | elif columns[j] is not None: |
| 198 | parts.append("|") |
| 199 | else: |
| 200 | parts.append(" ") |
| 201 | lines.append(" ".join(parts)) |
| 202 | columns[hi] = None |
| 203 | # Trim trailing Nones immediately so subsequent |
| 204 | # iterations see a clean column list. |
| 205 | while columns and columns[-1] is None: |
| 206 | columns.pop() |
| 207 | changed = True |
| 208 | break |
| 209 | else: |
| 210 | seen[c] = i |