cgcardona / muse public
reconcile.py python
208 lines 6.9 KB
8557932f feat(code): Phase 5 — multi-agent coordination layer Gabriel Cardona <gabriel@tellurstori.com> 1d ago
1 """muse reconcile — recommend merge ordering and integration strategy.
2
3 Reads active reservations, intents, and branch divergence to recommend:
4
5 1. **Merge ordering** — which branches should be merged first to minimize
6 downstream conflicts.
7 2. **Integration strategy** — fast-forward, squash, or rebase for each branch.
8 3. **Conflict hotspots** — symbols reserved by multiple agents that need
9 special attention.
10
11 ``muse reconcile`` is a *read-only* planning command. It does not write to
12 branches, commit history, or the coordination layer. It provides the plan;
13 agents execute it.
14
15 Why this exists
16 ---------------
17 In a system with millions of concurrent agents, merges happen constantly.
18 Without coordination, every merge introduces friction. ``muse reconcile``
19 gives an orchestration agent a complete picture of the current coordination
20 state and a recommended action plan.
21
22 Usage::
23
24 muse reconcile
25 muse reconcile --json
26
27 Output::
28
29 Reconciliation report
30 ──────────────────────────────────────────────────────────────
31
32 Active reservations: 3 Active intents: 2
33 Conflict hotspots: 1
34
35 Recommended merge order:
36 1. feature/billing (3 symbols, 0 predicted conflicts)
37 2. feature/auth (5 symbols, 1 predicted conflict)
38
39 Conflict hotspot:
40 src/billing.py::compute_total
41 reserved by: agent-41 (feature/billing), agent-42 (feature/auth)
42 recommendation: resolve feature/billing first; agent-42 must rebase
43
44 Integration strategy:
45 feature/billing → fast-forward (no conflicts predicted)
46 feature/auth → rebase onto main after feature/billing lands
47
48 Flags:
49
50 ``--json``
51 Emit the reconciliation report as JSON.
52 """
53 from __future__ import annotations
54
55 import json
56 import logging
57
58 import typer
59
60 from muse.core.coordination import active_reservations, load_all_intents
61 from muse.core.repo import require_repo
62
63 logger = logging.getLogger(__name__)
64
65 app = typer.Typer()
66
67
68 class _BranchSummary:
69 def __init__(self, branch: str) -> None:
70 self.branch = branch
71 self.reserved_addresses: list[str] = []
72 self.intents: list[str] = []
73 self.run_ids: set[str] = set()
74 self.conflict_count: int = 0
75
76 def to_dict(self) -> dict[str, str | int | list[str]]:
77 return {
78 "branch": self.branch,
79 "reserved_addresses": self.reserved_addresses,
80 "intents": self.intents,
81 "run_ids": sorted(self.run_ids),
82 "predicted_conflicts": self.conflict_count,
83 }
84
85
86 @app.callback(invoke_without_command=True)
87 def reconcile(
88 ctx: typer.Context,
89 as_json: bool = typer.Option(False, "--json", help="Emit report as JSON."),
90 ) -> None:
91 """Recommend merge ordering and integration strategy.
92
93 Reads coordination state (reservations + intents) and produces a
94 recommended action plan: which branches to merge first, what strategy
95 to use, and which conflict hotspots need manual attention.
96
97 Does not write anything — purely advisory output.
98 """
99 root = require_repo()
100 reservations = active_reservations(root)
101 intents = load_all_intents(root)
102
103 # Aggregate by branch.
104 branch_map: dict[str, _BranchSummary] = {}
105 for res in reservations:
106 b = res.branch
107 if b not in branch_map:
108 branch_map[b] = _BranchSummary(b)
109 branch_map[b].reserved_addresses.extend(res.addresses)
110 branch_map[b].run_ids.add(res.run_id)
111
112 for it in intents:
113 b = it.branch
114 if b not in branch_map:
115 branch_map[b] = _BranchSummary(b)
116 branch_map[b].intents.append(it.operation)
117 branch_map[b].run_ids.add(it.run_id)
118
119 # Detect conflict hotspots.
120 addr_branches: dict[str, list[str]] = {}
121 for res in reservations:
122 for addr in res.addresses:
123 addr_branches.setdefault(addr, []).append(res.branch)
124
125 hotspots: dict[str, list[str]] = {
126 addr: branches
127 for addr, branches in addr_branches.items()
128 if len(set(branches)) > 1
129 }
130
131 # Compute conflict counts per branch based on hotspot participation.
132 for addr, branches in hotspots.items():
133 unique_branches = list(dict.fromkeys(branches))
134 for b in unique_branches:
135 if b in branch_map:
136 branch_map[b].conflict_count += 1
137
138 # Recommend merge order: fewer conflicts → merge first.
139 ordered = sorted(
140 branch_map.values(),
141 key=lambda bs: (bs.conflict_count, len(bs.reserved_addresses)),
142 )
143
144 # Recommend integration strategies.
145 strategies: dict[str, str] = {}
146 for bs in ordered:
147 if bs.conflict_count == 0:
148 strategies[bs.branch] = "fast-forward (no conflicts predicted)"
149 elif bs.conflict_count <= 2:
150 strategies[bs.branch] = "rebase onto main before merging"
151 else:
152 strategies[bs.branch] = "manual conflict resolution required"
153
154 if as_json:
155 typer.echo(json.dumps(
156 {
157 "schema_version": 1,
158 "active_reservations": len(reservations),
159 "active_intents": len(intents),
160 "conflict_hotspots": len(hotspots),
161 "branches": [bs.to_dict() for bs in ordered],
162 "recommended_merge_order": [bs.branch for bs in ordered],
163 "strategies": strategies,
164 "hotspots": [
165 {"address": addr, "branches": list(dict.fromkeys(brs))}
166 for addr, brs in sorted(hotspots.items())
167 ],
168 },
169 indent=2,
170 ))
171 return
172
173 typer.echo("\nReconciliation report")
174 typer.echo("─" * 62)
175 typer.echo(
176 f" Active reservations: {len(reservations)} "
177 f"Active intents: {len(intents)} "
178 f"Conflict hotspots: {len(hotspots)}"
179 )
180
181 if not reservations and not intents:
182 typer.echo(
183 "\n (no active coordination data — run 'muse reserve' or 'muse intent' first)"
184 )
185 return
186
187 if ordered:
188 typer.echo(f"\n Recommended merge order:")
189 for rank, bs in enumerate(ordered, 1):
190 c = bs.conflict_count
191 typer.echo(
192 f" {rank}. {bs.branch:<30} ({len(bs.reserved_addresses)} addresses, "
193 f"{c} conflict(s))"
194 )
195
196 if hotspots:
197 typer.echo(f"\n Conflict hotspot(s):")
198 for addr, branches in sorted(hotspots.items()):
199 unique = list(dict.fromkeys(branches))
200 typer.echo(f" {addr}")
201 typer.echo(f" reserved by: {', '.join(unique)}")
202 first = unique[0]
203 rest = ", ".join(unique[1:])
204 typer.echo(f" → resolve {first!r} first; {rest} must rebase")
205
206 typer.echo(f"\n Integration strategies:")
207 for bs in ordered:
208 typer.echo(f" {bs.branch:<30} → {strategies[bs.branch]}")