cgcardona / muse public
test_callgraph.py python
336 lines 11.1 KB
e6786943 feat: upgrade to Python 3.14, drop from __future__ import annotations Gabriel Cardona <cgcardona@gmail.com> 1d ago
1 """Tests for muse/plugins/code/_callgraph.py.
2
3 Coverage
4 --------
5 build_forward_graph(root, manifest)
6 - Empty manifest → empty graph.
7 - Single Python file with function calls → callee names recorded.
8 - Nested calls (function calling another function).
9 - Non-Python files are skipped.
10 - Syntax error files are skipped gracefully.
11 - Method calls are recorded for class methods.
12
13 build_reverse_graph(root, manifest)
14 - Empty manifest → empty graph.
15 - Single caller → single callee reverse mapping.
16 - Multiple callers of same function → all listed.
17 - Sorted output (deterministic).
18
19 transitive_callers(start_name, reverse, max_depth)
20 - Direct caller only (depth 1).
21 - Transitive callers at depth 2.
22 - No callers → empty dict.
23 - Self-recursive function → terminates (no infinite loop).
24 - Multi-hop chain a→b→c→d.
25 - max_depth limits traversal.
26 - Return type is dict[int, list[str]] — depth-keyed.
27 """
28
29 import hashlib
30 import pathlib
31 import textwrap
32
33 import pytest
34
35 from muse.plugins.code._callgraph import (
36 build_forward_graph,
37 build_reverse_graph,
38 transitive_callers,
39 )
40 from muse.core.object_store import write_object
41
42
43 # ---------------------------------------------------------------------------
44 # Helpers
45 # ---------------------------------------------------------------------------
46
47
48 def _write_snapshot(
49 tmp_path: pathlib.Path,
50 files: dict[str, str],
51 ) -> dict[str, str]:
52 """Write source files to the object store and return a manifest dict."""
53 manifest: dict[str, str] = {}
54 for rel_path, source in files.items():
55 blob = source.encode()
56 oid = hashlib.sha256(blob).hexdigest()
57 write_object(tmp_path, oid, blob)
58 manifest[rel_path] = oid
59 return manifest
60
61
62 # ---------------------------------------------------------------------------
63 # build_forward_graph
64 # ---------------------------------------------------------------------------
65
66
67 class TestBuildForwardGraph:
68 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
69 graph = build_forward_graph(tmp_path, {})
70 assert graph == {}
71
72 def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None:
73 src = textwrap.dedent("""\
74 def standalone():
75 return 42
76 """)
77 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
78 graph = build_forward_graph(tmp_path, manifest)
79 if "src/a.py::standalone" in graph:
80 assert graph["src/a.py::standalone"] == frozenset()
81
82 def test_function_calls_another(self, tmp_path: pathlib.Path) -> None:
83 src = textwrap.dedent("""\
84 def helper():
85 return 1
86
87 def caller():
88 return helper()
89 """)
90 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
91 graph = build_forward_graph(tmp_path, manifest)
92 caller_key = "src/a.py::caller"
93 assert caller_key in graph
94 assert "helper" in graph[caller_key]
95
96 def test_nested_calls(self, tmp_path: pathlib.Path) -> None:
97 src = textwrap.dedent("""\
98 def a():
99 return b(c())
100
101 def b(x):
102 return x
103
104 def c():
105 return 1
106 """)
107 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
108 graph = build_forward_graph(tmp_path, manifest)
109 a_key = "src/a.py::a"
110 assert a_key in graph
111 assert "b" in graph[a_key]
112 assert "c" in graph[a_key]
113
114 def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None:
115 go_src = "func caller() { helper() }"
116 manifest = _write_snapshot(tmp_path, {"src/main.go": go_src})
117 graph = build_forward_graph(tmp_path, manifest)
118 assert len(graph) == 0
119
120 def test_multiple_files(self, tmp_path: pathlib.Path) -> None:
121 src_a = textwrap.dedent("""\
122 def alpha():
123 return beta()
124 """)
125 src_b = textwrap.dedent("""\
126 def beta():
127 return gamma()
128
129 def gamma():
130 return 0
131 """)
132 manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b})
133 graph = build_forward_graph(tmp_path, manifest)
134 assert "src/a.py::alpha" in graph
135 assert "beta" in graph["src/a.py::alpha"]
136 assert "src/b.py::beta" in graph
137 assert "gamma" in graph["src/b.py::beta"]
138
139 def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None:
140 bad_src = "def broken(:\n pass\n"
141 manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src})
142 graph = build_forward_graph(tmp_path, manifest)
143 assert isinstance(graph, dict)
144
145 def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None:
146 src = textwrap.dedent("""\
147 class Invoice:
148 def compute(self):
149 return self.apply_tax()
150
151 def apply_tax(self):
152 return 0.1
153 """)
154 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
155 graph = build_forward_graph(tmp_path, manifest)
156 compute_key = "src/a.py::Invoice.compute"
157 if compute_key in graph:
158 assert "apply_tax" in graph[compute_key]
159
160 def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None:
161 src = textwrap.dedent("""\
162 def f():
163 return g()
164 """)
165 manifest = _write_snapshot(tmp_path, {"a.py": src})
166 graph = build_forward_graph(tmp_path, manifest)
167 for callee_set in graph.values():
168 assert isinstance(callee_set, frozenset)
169
170
171 # ---------------------------------------------------------------------------
172 # build_reverse_graph
173 # ---------------------------------------------------------------------------
174
175
176 class TestBuildReverseGraph:
177 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
178 rev = build_reverse_graph(tmp_path, {})
179 assert rev == {}
180
181 def test_single_caller(self, tmp_path: pathlib.Path) -> None:
182 src = textwrap.dedent("""\
183 def helper():
184 return 1
185
186 def caller():
187 return helper()
188 """)
189 manifest = _write_snapshot(tmp_path, {"a.py": src})
190 rev = build_reverse_graph(tmp_path, manifest)
191 assert "helper" in rev
192 assert any("caller" in addr for addr in rev["helper"])
193
194 def test_multiple_callers(self, tmp_path: pathlib.Path) -> None:
195 src = textwrap.dedent("""\
196 def core():
197 return 0
198
199 def a():
200 return core()
201
202 def b():
203 return core()
204
205 def c():
206 return core()
207 """)
208 manifest = _write_snapshot(tmp_path, {"a.py": src})
209 rev = build_reverse_graph(tmp_path, manifest)
210 assert "core" in rev
211 callers = rev["core"]
212 assert len(callers) >= 3
213
214 def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None:
215 src = textwrap.dedent("""\
216 def target():
217 return 0
218
219 def a_caller():
220 return target()
221
222 def z_caller():
223 return target()
224 """)
225 manifest = _write_snapshot(tmp_path, {"a.py": src})
226 rev = build_reverse_graph(tmp_path, manifest)
227 if "target" in rev:
228 callers = rev["target"]
229 assert callers == sorted(callers)
230
231 def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None:
232 src = textwrap.dedent("""\
233 def f():
234 return g()
235 def g():
236 return 0
237 """)
238 manifest = _write_snapshot(tmp_path, {"a.py": src})
239 rev = build_reverse_graph(tmp_path, manifest)
240 for callers in rev.values():
241 assert isinstance(callers, list)
242
243
244 # ---------------------------------------------------------------------------
245 # transitive_callers — returns dict[int, list[str]]
246 # ---------------------------------------------------------------------------
247
248
249 class TestTransitiveCallers:
250 def test_no_callers_returns_empty(self) -> None:
251 rev: dict[str, list[str]] = {}
252 result = transitive_callers("orphan", rev)
253 assert result == {}
254
255 def test_direct_caller_at_depth_1(self) -> None:
256 rev: dict[str, list[str]] = {"target": ["a.py::caller"]}
257 result = transitive_callers("target", rev)
258 assert 1 in result
259 assert "a.py::caller" in result[1]
260
261 def test_two_hop_chain(self) -> None:
262 rev: dict[str, list[str]] = {
263 "target": ["a.py::caller"],
264 "caller": ["a.py::grandcaller"],
265 }
266 result = transitive_callers("target", rev)
267 assert 1 in result
268 assert "a.py::caller" in result[1]
269 assert 2 in result
270 assert "a.py::grandcaller" in result[2]
271
272 def test_multi_hop_chain(self) -> None:
273 rev: dict[str, list[str]] = {
274 "d": ["a.py::c"],
275 "c": ["a.py::b"],
276 "b": ["a.py::a"],
277 }
278 result = transitive_callers("d", rev)
279 assert 1 in result
280 assert "a.py::c" in result[1]
281 assert 2 in result
282 assert "a.py::b" in result[2]
283 assert 3 in result
284 assert "a.py::a" in result[3]
285
286 def test_no_infinite_loop_on_cycle(self) -> None:
287 rev: dict[str, list[str]] = {
288 "a": ["a.py::b"],
289 "b": ["a.py::a"],
290 }
291 result = transitive_callers("a", rev)
292 assert isinstance(result, dict)
293
294 def test_self_recursive_not_infinite(self) -> None:
295 rev: dict[str, list[str]] = {"recursive": ["a.py::recursive"]}
296 result = transitive_callers("recursive", rev)
297 assert isinstance(result, dict)
298
299 def test_max_depth_limits_traversal(self) -> None:
300 rev: dict[str, list[str]] = {
301 "d": ["a.py::c"],
302 "c": ["a.py::b"],
303 "b": ["a.py::a"],
304 }
305 result = transitive_callers("d", rev, max_depth=1)
306 assert 1 in result
307 assert 2 not in result
308 assert 3 not in result
309
310 def test_diamond_dependency_no_duplicate_addresses(self) -> None:
311 rev: dict[str, list[str]] = {
312 "a": ["x.py::b", "x.py::c"],
313 "b": ["x.py::d"],
314 "c": ["x.py::d"],
315 }
316 result = transitive_callers("a", rev)
317 all_addrs: list[str] = []
318 for addrs in result.values():
319 all_addrs.extend(addrs)
320 # x.py::d should appear at most once (visited set prevents duplicates).
321 assert all_addrs.count("x.py::d") <= 1
322
323 def test_multiple_direct_callers_all_at_depth_1(self) -> None:
324 rev: dict[str, list[str]] = {
325 "target": ["a.py::f1", "a.py::f2", "a.py::f3"],
326 }
327 result = transitive_callers("target", rev)
328 assert 1 in result
329 assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"}
330
331 def test_return_type_is_depth_to_list(self) -> None:
332 rev: dict[str, list[str]] = {"t": ["a.py::f"]}
333 result = transitive_callers("t", rev)
334 for depth, addrs in result.items():
335 assert isinstance(depth, int)
336 assert isinstance(addrs, list)