cgcardona / muse public
test_callgraph.py python
337 lines 11.1 KB
dfa7b7aa Add comprehensive docs and supercharged tests for Code Domain V2 (#70) Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for muse/plugins/code/_callgraph.py.
2
3 Coverage
4 --------
5 build_forward_graph(root, manifest)
6 - Empty manifest → empty graph.
7 - Single Python file with function calls → callee names recorded.
8 - Nested calls (function calling another function).
9 - Non-Python files are skipped.
10 - Syntax error files are skipped gracefully.
11 - Method calls are recorded for class methods.
12
13 build_reverse_graph(root, manifest)
14 - Empty manifest → empty graph.
15 - Single caller → single callee reverse mapping.
16 - Multiple callers of same function → all listed.
17 - Sorted output (deterministic).
18
19 transitive_callers(start_name, reverse, max_depth)
20 - Direct caller only (depth 1).
21 - Transitive callers at depth 2.
22 - No callers → empty dict.
23 - Self-recursive function → terminates (no infinite loop).
24 - Multi-hop chain a→b→c→d.
25 - max_depth limits traversal.
26 - Return type is dict[int, list[str]] — depth-keyed.
27 """
28 from __future__ import annotations
29
30 import hashlib
31 import pathlib
32 import textwrap
33
34 import pytest
35
36 from muse.plugins.code._callgraph import (
37 build_forward_graph,
38 build_reverse_graph,
39 transitive_callers,
40 )
41 from muse.core.object_store import write_object
42
43
44 # ---------------------------------------------------------------------------
45 # Helpers
46 # ---------------------------------------------------------------------------
47
48
49 def _write_snapshot(
50 tmp_path: pathlib.Path,
51 files: dict[str, str],
52 ) -> dict[str, str]:
53 """Write source files to the object store and return a manifest dict."""
54 manifest: dict[str, str] = {}
55 for rel_path, source in files.items():
56 blob = source.encode()
57 oid = hashlib.sha256(blob).hexdigest()
58 write_object(tmp_path, oid, blob)
59 manifest[rel_path] = oid
60 return manifest
61
62
63 # ---------------------------------------------------------------------------
64 # build_forward_graph
65 # ---------------------------------------------------------------------------
66
67
68 class TestBuildForwardGraph:
69 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
70 graph = build_forward_graph(tmp_path, {})
71 assert graph == {}
72
73 def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None:
74 src = textwrap.dedent("""\
75 def standalone():
76 return 42
77 """)
78 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
79 graph = build_forward_graph(tmp_path, manifest)
80 if "src/a.py::standalone" in graph:
81 assert graph["src/a.py::standalone"] == frozenset()
82
83 def test_function_calls_another(self, tmp_path: pathlib.Path) -> None:
84 src = textwrap.dedent("""\
85 def helper():
86 return 1
87
88 def caller():
89 return helper()
90 """)
91 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
92 graph = build_forward_graph(tmp_path, manifest)
93 caller_key = "src/a.py::caller"
94 assert caller_key in graph
95 assert "helper" in graph[caller_key]
96
97 def test_nested_calls(self, tmp_path: pathlib.Path) -> None:
98 src = textwrap.dedent("""\
99 def a():
100 return b(c())
101
102 def b(x):
103 return x
104
105 def c():
106 return 1
107 """)
108 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
109 graph = build_forward_graph(tmp_path, manifest)
110 a_key = "src/a.py::a"
111 assert a_key in graph
112 assert "b" in graph[a_key]
113 assert "c" in graph[a_key]
114
115 def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None:
116 go_src = "func caller() { helper() }"
117 manifest = _write_snapshot(tmp_path, {"src/main.go": go_src})
118 graph = build_forward_graph(tmp_path, manifest)
119 assert len(graph) == 0
120
121 def test_multiple_files(self, tmp_path: pathlib.Path) -> None:
122 src_a = textwrap.dedent("""\
123 def alpha():
124 return beta()
125 """)
126 src_b = textwrap.dedent("""\
127 def beta():
128 return gamma()
129
130 def gamma():
131 return 0
132 """)
133 manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b})
134 graph = build_forward_graph(tmp_path, manifest)
135 assert "src/a.py::alpha" in graph
136 assert "beta" in graph["src/a.py::alpha"]
137 assert "src/b.py::beta" in graph
138 assert "gamma" in graph["src/b.py::beta"]
139
140 def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None:
141 bad_src = "def broken(:\n pass\n"
142 manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src})
143 graph = build_forward_graph(tmp_path, manifest)
144 assert isinstance(graph, dict)
145
146 def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None:
147 src = textwrap.dedent("""\
148 class Invoice:
149 def compute(self):
150 return self.apply_tax()
151
152 def apply_tax(self):
153 return 0.1
154 """)
155 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
156 graph = build_forward_graph(tmp_path, manifest)
157 compute_key = "src/a.py::Invoice.compute"
158 if compute_key in graph:
159 assert "apply_tax" in graph[compute_key]
160
161 def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None:
162 src = textwrap.dedent("""\
163 def f():
164 return g()
165 """)
166 manifest = _write_snapshot(tmp_path, {"a.py": src})
167 graph = build_forward_graph(tmp_path, manifest)
168 for callee_set in graph.values():
169 assert isinstance(callee_set, frozenset)
170
171
172 # ---------------------------------------------------------------------------
173 # build_reverse_graph
174 # ---------------------------------------------------------------------------
175
176
177 class TestBuildReverseGraph:
178 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
179 rev = build_reverse_graph(tmp_path, {})
180 assert rev == {}
181
182 def test_single_caller(self, tmp_path: pathlib.Path) -> None:
183 src = textwrap.dedent("""\
184 def helper():
185 return 1
186
187 def caller():
188 return helper()
189 """)
190 manifest = _write_snapshot(tmp_path, {"a.py": src})
191 rev = build_reverse_graph(tmp_path, manifest)
192 assert "helper" in rev
193 assert any("caller" in addr for addr in rev["helper"])
194
195 def test_multiple_callers(self, tmp_path: pathlib.Path) -> None:
196 src = textwrap.dedent("""\
197 def core():
198 return 0
199
200 def a():
201 return core()
202
203 def b():
204 return core()
205
206 def c():
207 return core()
208 """)
209 manifest = _write_snapshot(tmp_path, {"a.py": src})
210 rev = build_reverse_graph(tmp_path, manifest)
211 assert "core" in rev
212 callers = rev["core"]
213 assert len(callers) >= 3
214
215 def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None:
216 src = textwrap.dedent("""\
217 def target():
218 return 0
219
220 def a_caller():
221 return target()
222
223 def z_caller():
224 return target()
225 """)
226 manifest = _write_snapshot(tmp_path, {"a.py": src})
227 rev = build_reverse_graph(tmp_path, manifest)
228 if "target" in rev:
229 callers = rev["target"]
230 assert callers == sorted(callers)
231
232 def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None:
233 src = textwrap.dedent("""\
234 def f():
235 return g()
236 def g():
237 return 0
238 """)
239 manifest = _write_snapshot(tmp_path, {"a.py": src})
240 rev = build_reverse_graph(tmp_path, manifest)
241 for callers in rev.values():
242 assert isinstance(callers, list)
243
244
245 # ---------------------------------------------------------------------------
246 # transitive_callers — returns dict[int, list[str]]
247 # ---------------------------------------------------------------------------
248
249
250 class TestTransitiveCallers:
251 def test_no_callers_returns_empty(self) -> None:
252 rev: dict[str, list[str]] = {}
253 result = transitive_callers("orphan", rev)
254 assert result == {}
255
256 def test_direct_caller_at_depth_1(self) -> None:
257 rev: dict[str, list[str]] = {"target": ["a.py::caller"]}
258 result = transitive_callers("target", rev)
259 assert 1 in result
260 assert "a.py::caller" in result[1]
261
262 def test_two_hop_chain(self) -> None:
263 rev: dict[str, list[str]] = {
264 "target": ["a.py::caller"],
265 "caller": ["a.py::grandcaller"],
266 }
267 result = transitive_callers("target", rev)
268 assert 1 in result
269 assert "a.py::caller" in result[1]
270 assert 2 in result
271 assert "a.py::grandcaller" in result[2]
272
273 def test_multi_hop_chain(self) -> None:
274 rev: dict[str, list[str]] = {
275 "d": ["a.py::c"],
276 "c": ["a.py::b"],
277 "b": ["a.py::a"],
278 }
279 result = transitive_callers("d", rev)
280 assert 1 in result
281 assert "a.py::c" in result[1]
282 assert 2 in result
283 assert "a.py::b" in result[2]
284 assert 3 in result
285 assert "a.py::a" in result[3]
286
287 def test_no_infinite_loop_on_cycle(self) -> None:
288 rev: dict[str, list[str]] = {
289 "a": ["a.py::b"],
290 "b": ["a.py::a"],
291 }
292 result = transitive_callers("a", rev)
293 assert isinstance(result, dict)
294
295 def test_self_recursive_not_infinite(self) -> None:
296 rev: dict[str, list[str]] = {"recursive": ["a.py::recursive"]}
297 result = transitive_callers("recursive", rev)
298 assert isinstance(result, dict)
299
300 def test_max_depth_limits_traversal(self) -> None:
301 rev: dict[str, list[str]] = {
302 "d": ["a.py::c"],
303 "c": ["a.py::b"],
304 "b": ["a.py::a"],
305 }
306 result = transitive_callers("d", rev, max_depth=1)
307 assert 1 in result
308 assert 2 not in result
309 assert 3 not in result
310
311 def test_diamond_dependency_no_duplicate_addresses(self) -> None:
312 rev: dict[str, list[str]] = {
313 "a": ["x.py::b", "x.py::c"],
314 "b": ["x.py::d"],
315 "c": ["x.py::d"],
316 }
317 result = transitive_callers("a", rev)
318 all_addrs: list[str] = []
319 for addrs in result.values():
320 all_addrs.extend(addrs)
321 # x.py::d should appear at most once (visited set prevents duplicates).
322 assert all_addrs.count("x.py::d") <= 1
323
324 def test_multiple_direct_callers_all_at_depth_1(self) -> None:
325 rev: dict[str, list[str]] = {
326 "target": ["a.py::f1", "a.py::f2", "a.py::f3"],
327 }
328 result = transitive_callers("target", rev)
329 assert 1 in result
330 assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"}
331
332 def test_return_type_is_depth_to_list(self) -> None:
333 rev: dict[str, list[str]] = {"t": ["a.py::f"]}
334 result = transitive_callers("t", rev)
335 for depth, addrs in result.items():
336 assert isinstance(depth, int)
337 assert isinstance(addrs, list)