test_callgraph.py
python
| 1 | """Tests for muse/plugins/code/_callgraph.py. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | build_forward_graph(root, manifest) |
| 6 | - Empty manifest → empty graph. |
| 7 | - Single Python file with function calls → callee names recorded. |
| 8 | - Nested calls (function calling another function). |
| 9 | - Non-Python files are skipped. |
| 10 | - Syntax error files are skipped gracefully. |
| 11 | - Method calls are recorded for class methods. |
| 12 | |
| 13 | build_reverse_graph(root, manifest) |
| 14 | - Empty manifest → empty graph. |
| 15 | - Single caller → single callee reverse mapping. |
| 16 | - Multiple callers of same function → all listed. |
| 17 | - Sorted output (deterministic). |
| 18 | |
| 19 | transitive_callers(start_name, reverse, max_depth) |
| 20 | - Direct caller only (depth 1). |
| 21 | - Transitive callers at depth 2. |
| 22 | - No callers → empty dict. |
| 23 | - Self-recursive function → terminates (no infinite loop). |
| 24 | - Multi-hop chain a→b→c→d. |
| 25 | - max_depth limits traversal. |
| 26 | - Return type is dict[int, list[str]] — depth-keyed. |
| 27 | """ |
| 28 | from __future__ import annotations |
| 29 | |
| 30 | import hashlib |
| 31 | import pathlib |
| 32 | import textwrap |
| 33 | |
| 34 | import pytest |
| 35 | |
| 36 | from muse.plugins.code._callgraph import ( |
| 37 | build_forward_graph, |
| 38 | build_reverse_graph, |
| 39 | transitive_callers, |
| 40 | ) |
| 41 | from muse.core.object_store import write_object |
| 42 | |
| 43 | |
| 44 | # --------------------------------------------------------------------------- |
| 45 | # Helpers |
| 46 | # --------------------------------------------------------------------------- |
| 47 | |
| 48 | |
| 49 | def _write_snapshot( |
| 50 | tmp_path: pathlib.Path, |
| 51 | files: dict[str, str], |
| 52 | ) -> dict[str, str]: |
| 53 | """Write source files to the object store and return a manifest dict.""" |
| 54 | manifest: dict[str, str] = {} |
| 55 | for rel_path, source in files.items(): |
| 56 | blob = source.encode() |
| 57 | oid = hashlib.sha256(blob).hexdigest() |
| 58 | write_object(tmp_path, oid, blob) |
| 59 | manifest[rel_path] = oid |
| 60 | return manifest |
| 61 | |
| 62 | |
| 63 | # --------------------------------------------------------------------------- |
| 64 | # build_forward_graph |
| 65 | # --------------------------------------------------------------------------- |
| 66 | |
| 67 | |
| 68 | class TestBuildForwardGraph: |
| 69 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 70 | graph = build_forward_graph(tmp_path, {}) |
| 71 | assert graph == {} |
| 72 | |
| 73 | def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None: |
| 74 | src = textwrap.dedent("""\ |
| 75 | def standalone(): |
| 76 | return 42 |
| 77 | """) |
| 78 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 79 | graph = build_forward_graph(tmp_path, manifest) |
| 80 | if "src/a.py::standalone" in graph: |
| 81 | assert graph["src/a.py::standalone"] == frozenset() |
| 82 | |
| 83 | def test_function_calls_another(self, tmp_path: pathlib.Path) -> None: |
| 84 | src = textwrap.dedent("""\ |
| 85 | def helper(): |
| 86 | return 1 |
| 87 | |
| 88 | def caller(): |
| 89 | return helper() |
| 90 | """) |
| 91 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 92 | graph = build_forward_graph(tmp_path, manifest) |
| 93 | caller_key = "src/a.py::caller" |
| 94 | assert caller_key in graph |
| 95 | assert "helper" in graph[caller_key] |
| 96 | |
| 97 | def test_nested_calls(self, tmp_path: pathlib.Path) -> None: |
| 98 | src = textwrap.dedent("""\ |
| 99 | def a(): |
| 100 | return b(c()) |
| 101 | |
| 102 | def b(x): |
| 103 | return x |
| 104 | |
| 105 | def c(): |
| 106 | return 1 |
| 107 | """) |
| 108 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 109 | graph = build_forward_graph(tmp_path, manifest) |
| 110 | a_key = "src/a.py::a" |
| 111 | assert a_key in graph |
| 112 | assert "b" in graph[a_key] |
| 113 | assert "c" in graph[a_key] |
| 114 | |
| 115 | def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None: |
| 116 | go_src = "func caller() { helper() }" |
| 117 | manifest = _write_snapshot(tmp_path, {"src/main.go": go_src}) |
| 118 | graph = build_forward_graph(tmp_path, manifest) |
| 119 | assert len(graph) == 0 |
| 120 | |
| 121 | def test_multiple_files(self, tmp_path: pathlib.Path) -> None: |
| 122 | src_a = textwrap.dedent("""\ |
| 123 | def alpha(): |
| 124 | return beta() |
| 125 | """) |
| 126 | src_b = textwrap.dedent("""\ |
| 127 | def beta(): |
| 128 | return gamma() |
| 129 | |
| 130 | def gamma(): |
| 131 | return 0 |
| 132 | """) |
| 133 | manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b}) |
| 134 | graph = build_forward_graph(tmp_path, manifest) |
| 135 | assert "src/a.py::alpha" in graph |
| 136 | assert "beta" in graph["src/a.py::alpha"] |
| 137 | assert "src/b.py::beta" in graph |
| 138 | assert "gamma" in graph["src/b.py::beta"] |
| 139 | |
| 140 | def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: |
| 141 | bad_src = "def broken(:\n pass\n" |
| 142 | manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src}) |
| 143 | graph = build_forward_graph(tmp_path, manifest) |
| 144 | assert isinstance(graph, dict) |
| 145 | |
| 146 | def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None: |
| 147 | src = textwrap.dedent("""\ |
| 148 | class Invoice: |
| 149 | def compute(self): |
| 150 | return self.apply_tax() |
| 151 | |
| 152 | def apply_tax(self): |
| 153 | return 0.1 |
| 154 | """) |
| 155 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 156 | graph = build_forward_graph(tmp_path, manifest) |
| 157 | compute_key = "src/a.py::Invoice.compute" |
| 158 | if compute_key in graph: |
| 159 | assert "apply_tax" in graph[compute_key] |
| 160 | |
| 161 | def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None: |
| 162 | src = textwrap.dedent("""\ |
| 163 | def f(): |
| 164 | return g() |
| 165 | """) |
| 166 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 167 | graph = build_forward_graph(tmp_path, manifest) |
| 168 | for callee_set in graph.values(): |
| 169 | assert isinstance(callee_set, frozenset) |
| 170 | |
| 171 | |
| 172 | # --------------------------------------------------------------------------- |
| 173 | # build_reverse_graph |
| 174 | # --------------------------------------------------------------------------- |
| 175 | |
| 176 | |
| 177 | class TestBuildReverseGraph: |
| 178 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 179 | rev = build_reverse_graph(tmp_path, {}) |
| 180 | assert rev == {} |
| 181 | |
| 182 | def test_single_caller(self, tmp_path: pathlib.Path) -> None: |
| 183 | src = textwrap.dedent("""\ |
| 184 | def helper(): |
| 185 | return 1 |
| 186 | |
| 187 | def caller(): |
| 188 | return helper() |
| 189 | """) |
| 190 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 191 | rev = build_reverse_graph(tmp_path, manifest) |
| 192 | assert "helper" in rev |
| 193 | assert any("caller" in addr for addr in rev["helper"]) |
| 194 | |
| 195 | def test_multiple_callers(self, tmp_path: pathlib.Path) -> None: |
| 196 | src = textwrap.dedent("""\ |
| 197 | def core(): |
| 198 | return 0 |
| 199 | |
| 200 | def a(): |
| 201 | return core() |
| 202 | |
| 203 | def b(): |
| 204 | return core() |
| 205 | |
| 206 | def c(): |
| 207 | return core() |
| 208 | """) |
| 209 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 210 | rev = build_reverse_graph(tmp_path, manifest) |
| 211 | assert "core" in rev |
| 212 | callers = rev["core"] |
| 213 | assert len(callers) >= 3 |
| 214 | |
| 215 | def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None: |
| 216 | src = textwrap.dedent("""\ |
| 217 | def target(): |
| 218 | return 0 |
| 219 | |
| 220 | def a_caller(): |
| 221 | return target() |
| 222 | |
| 223 | def z_caller(): |
| 224 | return target() |
| 225 | """) |
| 226 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 227 | rev = build_reverse_graph(tmp_path, manifest) |
| 228 | if "target" in rev: |
| 229 | callers = rev["target"] |
| 230 | assert callers == sorted(callers) |
| 231 | |
| 232 | def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None: |
| 233 | src = textwrap.dedent("""\ |
| 234 | def f(): |
| 235 | return g() |
| 236 | def g(): |
| 237 | return 0 |
| 238 | """) |
| 239 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 240 | rev = build_reverse_graph(tmp_path, manifest) |
| 241 | for callers in rev.values(): |
| 242 | assert isinstance(callers, list) |
| 243 | |
| 244 | |
| 245 | # --------------------------------------------------------------------------- |
| 246 | # transitive_callers — returns dict[int, list[str]] |
| 247 | # --------------------------------------------------------------------------- |
| 248 | |
| 249 | |
| 250 | class TestTransitiveCallers: |
| 251 | def test_no_callers_returns_empty(self) -> None: |
| 252 | rev: dict[str, list[str]] = {} |
| 253 | result = transitive_callers("orphan", rev) |
| 254 | assert result == {} |
| 255 | |
| 256 | def test_direct_caller_at_depth_1(self) -> None: |
| 257 | rev: dict[str, list[str]] = {"target": ["a.py::caller"]} |
| 258 | result = transitive_callers("target", rev) |
| 259 | assert 1 in result |
| 260 | assert "a.py::caller" in result[1] |
| 261 | |
| 262 | def test_two_hop_chain(self) -> None: |
| 263 | rev: dict[str, list[str]] = { |
| 264 | "target": ["a.py::caller"], |
| 265 | "caller": ["a.py::grandcaller"], |
| 266 | } |
| 267 | result = transitive_callers("target", rev) |
| 268 | assert 1 in result |
| 269 | assert "a.py::caller" in result[1] |
| 270 | assert 2 in result |
| 271 | assert "a.py::grandcaller" in result[2] |
| 272 | |
| 273 | def test_multi_hop_chain(self) -> None: |
| 274 | rev: dict[str, list[str]] = { |
| 275 | "d": ["a.py::c"], |
| 276 | "c": ["a.py::b"], |
| 277 | "b": ["a.py::a"], |
| 278 | } |
| 279 | result = transitive_callers("d", rev) |
| 280 | assert 1 in result |
| 281 | assert "a.py::c" in result[1] |
| 282 | assert 2 in result |
| 283 | assert "a.py::b" in result[2] |
| 284 | assert 3 in result |
| 285 | assert "a.py::a" in result[3] |
| 286 | |
| 287 | def test_no_infinite_loop_on_cycle(self) -> None: |
| 288 | rev: dict[str, list[str]] = { |
| 289 | "a": ["a.py::b"], |
| 290 | "b": ["a.py::a"], |
| 291 | } |
| 292 | result = transitive_callers("a", rev) |
| 293 | assert isinstance(result, dict) |
| 294 | |
| 295 | def test_self_recursive_not_infinite(self) -> None: |
| 296 | rev: dict[str, list[str]] = {"recursive": ["a.py::recursive"]} |
| 297 | result = transitive_callers("recursive", rev) |
| 298 | assert isinstance(result, dict) |
| 299 | |
| 300 | def test_max_depth_limits_traversal(self) -> None: |
| 301 | rev: dict[str, list[str]] = { |
| 302 | "d": ["a.py::c"], |
| 303 | "c": ["a.py::b"], |
| 304 | "b": ["a.py::a"], |
| 305 | } |
| 306 | result = transitive_callers("d", rev, max_depth=1) |
| 307 | assert 1 in result |
| 308 | assert 2 not in result |
| 309 | assert 3 not in result |
| 310 | |
| 311 | def test_diamond_dependency_no_duplicate_addresses(self) -> None: |
| 312 | rev: dict[str, list[str]] = { |
| 313 | "a": ["x.py::b", "x.py::c"], |
| 314 | "b": ["x.py::d"], |
| 315 | "c": ["x.py::d"], |
| 316 | } |
| 317 | result = transitive_callers("a", rev) |
| 318 | all_addrs: list[str] = [] |
| 319 | for addrs in result.values(): |
| 320 | all_addrs.extend(addrs) |
| 321 | # x.py::d should appear at most once (visited set prevents duplicates). |
| 322 | assert all_addrs.count("x.py::d") <= 1 |
| 323 | |
| 324 | def test_multiple_direct_callers_all_at_depth_1(self) -> None: |
| 325 | rev: dict[str, list[str]] = { |
| 326 | "target": ["a.py::f1", "a.py::f2", "a.py::f3"], |
| 327 | } |
| 328 | result = transitive_callers("target", rev) |
| 329 | assert 1 in result |
| 330 | assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"} |
| 331 | |
| 332 | def test_return_type_is_depth_to_list(self) -> None: |
| 333 | rev: dict[str, list[str]] = {"t": ["a.py::f"]} |
| 334 | result = transitive_callers("t", rev) |
| 335 | for depth, addrs in result.items(): |
| 336 | assert isinstance(depth, int) |
| 337 | assert isinstance(addrs, list) |