test_callgraph.py
python
| 1 | """Tests for muse/plugins/code/_callgraph.py. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | build_forward_graph(root, manifest) |
| 6 | - Empty manifest → empty graph. |
| 7 | - Single Python file with function calls → callee names recorded. |
| 8 | - Nested calls (function calling another function). |
| 9 | - Non-Python files are skipped. |
| 10 | - Syntax error files are skipped gracefully. |
| 11 | - Method calls are recorded for class methods. |
| 12 | |
| 13 | build_reverse_graph(root, manifest) |
| 14 | - Empty manifest → empty graph. |
| 15 | - Single caller → single callee reverse mapping. |
| 16 | - Multiple callers of same function → all listed. |
| 17 | - Sorted output (deterministic). |
| 18 | |
| 19 | transitive_callers(start_name, reverse, max_depth) |
| 20 | - Direct caller only (depth 1). |
| 21 | - Transitive callers at depth 2. |
| 22 | - No callers → empty dict. |
| 23 | - Self-recursive function → terminates (no infinite loop). |
| 24 | - Multi-hop chain a→b→c→d. |
| 25 | - max_depth limits traversal. |
| 26 | - Return type is dict[int, list[str]] — depth-keyed. |
| 27 | """ |
| 28 | |
| 29 | import hashlib |
| 30 | import pathlib |
| 31 | import textwrap |
| 32 | |
| 33 | import pytest |
| 34 | |
| 35 | from muse.plugins.code._callgraph import ( |
| 36 | build_forward_graph, |
| 37 | build_reverse_graph, |
| 38 | transitive_callers, |
| 39 | ) |
| 40 | from muse.core.object_store import write_object |
| 41 | |
| 42 | |
| 43 | # --------------------------------------------------------------------------- |
| 44 | # Helpers |
| 45 | # --------------------------------------------------------------------------- |
| 46 | |
| 47 | |
| 48 | def _write_snapshot( |
| 49 | tmp_path: pathlib.Path, |
| 50 | files: dict[str, str], |
| 51 | ) -> dict[str, str]: |
| 52 | """Write source files to the object store and return a manifest dict.""" |
| 53 | manifest: dict[str, str] = {} |
| 54 | for rel_path, source in files.items(): |
| 55 | blob = source.encode() |
| 56 | oid = hashlib.sha256(blob).hexdigest() |
| 57 | write_object(tmp_path, oid, blob) |
| 58 | manifest[rel_path] = oid |
| 59 | return manifest |
| 60 | |
| 61 | |
| 62 | # --------------------------------------------------------------------------- |
| 63 | # build_forward_graph |
| 64 | # --------------------------------------------------------------------------- |
| 65 | |
| 66 | |
| 67 | class TestBuildForwardGraph: |
| 68 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 69 | graph = build_forward_graph(tmp_path, {}) |
| 70 | assert graph == {} |
| 71 | |
| 72 | def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None: |
| 73 | src = textwrap.dedent("""\ |
| 74 | def standalone(): |
| 75 | return 42 |
| 76 | """) |
| 77 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 78 | graph = build_forward_graph(tmp_path, manifest) |
| 79 | if "src/a.py::standalone" in graph: |
| 80 | assert graph["src/a.py::standalone"] == frozenset() |
| 81 | |
| 82 | def test_function_calls_another(self, tmp_path: pathlib.Path) -> None: |
| 83 | src = textwrap.dedent("""\ |
| 84 | def helper(): |
| 85 | return 1 |
| 86 | |
| 87 | def caller(): |
| 88 | return helper() |
| 89 | """) |
| 90 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 91 | graph = build_forward_graph(tmp_path, manifest) |
| 92 | caller_key = "src/a.py::caller" |
| 93 | assert caller_key in graph |
| 94 | assert "helper" in graph[caller_key] |
| 95 | |
| 96 | def test_nested_calls(self, tmp_path: pathlib.Path) -> None: |
| 97 | src = textwrap.dedent("""\ |
| 98 | def a(): |
| 99 | return b(c()) |
| 100 | |
| 101 | def b(x): |
| 102 | return x |
| 103 | |
| 104 | def c(): |
| 105 | return 1 |
| 106 | """) |
| 107 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 108 | graph = build_forward_graph(tmp_path, manifest) |
| 109 | a_key = "src/a.py::a" |
| 110 | assert a_key in graph |
| 111 | assert "b" in graph[a_key] |
| 112 | assert "c" in graph[a_key] |
| 113 | |
| 114 | def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None: |
| 115 | go_src = "func caller() { helper() }" |
| 116 | manifest = _write_snapshot(tmp_path, {"src/main.go": go_src}) |
| 117 | graph = build_forward_graph(tmp_path, manifest) |
| 118 | assert len(graph) == 0 |
| 119 | |
| 120 | def test_multiple_files(self, tmp_path: pathlib.Path) -> None: |
| 121 | src_a = textwrap.dedent("""\ |
| 122 | def alpha(): |
| 123 | return beta() |
| 124 | """) |
| 125 | src_b = textwrap.dedent("""\ |
| 126 | def beta(): |
| 127 | return gamma() |
| 128 | |
| 129 | def gamma(): |
| 130 | return 0 |
| 131 | """) |
| 132 | manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b}) |
| 133 | graph = build_forward_graph(tmp_path, manifest) |
| 134 | assert "src/a.py::alpha" in graph |
| 135 | assert "beta" in graph["src/a.py::alpha"] |
| 136 | assert "src/b.py::beta" in graph |
| 137 | assert "gamma" in graph["src/b.py::beta"] |
| 138 | |
| 139 | def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: |
| 140 | bad_src = "def broken(:\n pass\n" |
| 141 | manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src}) |
| 142 | graph = build_forward_graph(tmp_path, manifest) |
| 143 | assert isinstance(graph, dict) |
| 144 | |
| 145 | def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None: |
| 146 | src = textwrap.dedent("""\ |
| 147 | class Invoice: |
| 148 | def compute(self): |
| 149 | return self.apply_tax() |
| 150 | |
| 151 | def apply_tax(self): |
| 152 | return 0.1 |
| 153 | """) |
| 154 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 155 | graph = build_forward_graph(tmp_path, manifest) |
| 156 | compute_key = "src/a.py::Invoice.compute" |
| 157 | if compute_key in graph: |
| 158 | assert "apply_tax" in graph[compute_key] |
| 159 | |
| 160 | def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None: |
| 161 | src = textwrap.dedent("""\ |
| 162 | def f(): |
| 163 | return g() |
| 164 | """) |
| 165 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 166 | graph = build_forward_graph(tmp_path, manifest) |
| 167 | for callee_set in graph.values(): |
| 168 | assert isinstance(callee_set, frozenset) |
| 169 | |
| 170 | |
| 171 | # --------------------------------------------------------------------------- |
| 172 | # build_reverse_graph |
| 173 | # --------------------------------------------------------------------------- |
| 174 | |
| 175 | |
| 176 | class TestBuildReverseGraph: |
| 177 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 178 | rev = build_reverse_graph(tmp_path, {}) |
| 179 | assert rev == {} |
| 180 | |
| 181 | def test_single_caller(self, tmp_path: pathlib.Path) -> None: |
| 182 | src = textwrap.dedent("""\ |
| 183 | def helper(): |
| 184 | return 1 |
| 185 | |
| 186 | def caller(): |
| 187 | return helper() |
| 188 | """) |
| 189 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 190 | rev = build_reverse_graph(tmp_path, manifest) |
| 191 | assert "helper" in rev |
| 192 | assert any("caller" in addr for addr in rev["helper"]) |
| 193 | |
| 194 | def test_multiple_callers(self, tmp_path: pathlib.Path) -> None: |
| 195 | src = textwrap.dedent("""\ |
| 196 | def core(): |
| 197 | return 0 |
| 198 | |
| 199 | def a(): |
| 200 | return core() |
| 201 | |
| 202 | def b(): |
| 203 | return core() |
| 204 | |
| 205 | def c(): |
| 206 | return core() |
| 207 | """) |
| 208 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 209 | rev = build_reverse_graph(tmp_path, manifest) |
| 210 | assert "core" in rev |
| 211 | callers = rev["core"] |
| 212 | assert len(callers) >= 3 |
| 213 | |
| 214 | def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None: |
| 215 | src = textwrap.dedent("""\ |
| 216 | def target(): |
| 217 | return 0 |
| 218 | |
| 219 | def a_caller(): |
| 220 | return target() |
| 221 | |
| 222 | def z_caller(): |
| 223 | return target() |
| 224 | """) |
| 225 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 226 | rev = build_reverse_graph(tmp_path, manifest) |
| 227 | if "target" in rev: |
| 228 | callers = rev["target"] |
| 229 | assert callers == sorted(callers) |
| 230 | |
| 231 | def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None: |
| 232 | src = textwrap.dedent("""\ |
| 233 | def f(): |
| 234 | return g() |
| 235 | def g(): |
| 236 | return 0 |
| 237 | """) |
| 238 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 239 | rev = build_reverse_graph(tmp_path, manifest) |
| 240 | for callers in rev.values(): |
| 241 | assert isinstance(callers, list) |
| 242 | |
| 243 | |
| 244 | # --------------------------------------------------------------------------- |
| 245 | # transitive_callers — returns dict[int, list[str]] |
| 246 | # --------------------------------------------------------------------------- |
| 247 | |
| 248 | |
| 249 | class TestTransitiveCallers: |
| 250 | def test_no_callers_returns_empty(self) -> None: |
| 251 | rev: dict[str, list[str]] = {} |
| 252 | result = transitive_callers("orphan", rev) |
| 253 | assert result == {} |
| 254 | |
| 255 | def test_direct_caller_at_depth_1(self) -> None: |
| 256 | rev: dict[str, list[str]] = {"target": ["a.py::caller"]} |
| 257 | result = transitive_callers("target", rev) |
| 258 | assert 1 in result |
| 259 | assert "a.py::caller" in result[1] |
| 260 | |
| 261 | def test_two_hop_chain(self) -> None: |
| 262 | rev: dict[str, list[str]] = { |
| 263 | "target": ["a.py::caller"], |
| 264 | "caller": ["a.py::grandcaller"], |
| 265 | } |
| 266 | result = transitive_callers("target", rev) |
| 267 | assert 1 in result |
| 268 | assert "a.py::caller" in result[1] |
| 269 | assert 2 in result |
| 270 | assert "a.py::grandcaller" in result[2] |
| 271 | |
| 272 | def test_multi_hop_chain(self) -> None: |
| 273 | rev: dict[str, list[str]] = { |
| 274 | "d": ["a.py::c"], |
| 275 | "c": ["a.py::b"], |
| 276 | "b": ["a.py::a"], |
| 277 | } |
| 278 | result = transitive_callers("d", rev) |
| 279 | assert 1 in result |
| 280 | assert "a.py::c" in result[1] |
| 281 | assert 2 in result |
| 282 | assert "a.py::b" in result[2] |
| 283 | assert 3 in result |
| 284 | assert "a.py::a" in result[3] |
| 285 | |
| 286 | def test_no_infinite_loop_on_cycle(self) -> None: |
| 287 | rev: dict[str, list[str]] = { |
| 288 | "a": ["a.py::b"], |
| 289 | "b": ["a.py::a"], |
| 290 | } |
| 291 | result = transitive_callers("a", rev) |
| 292 | assert isinstance(result, dict) |
| 293 | |
| 294 | def test_self_recursive_not_infinite(self) -> None: |
| 295 | rev: dict[str, list[str]] = {"recursive": ["a.py::recursive"]} |
| 296 | result = transitive_callers("recursive", rev) |
| 297 | assert isinstance(result, dict) |
| 298 | |
| 299 | def test_max_depth_limits_traversal(self) -> None: |
| 300 | rev: dict[str, list[str]] = { |
| 301 | "d": ["a.py::c"], |
| 302 | "c": ["a.py::b"], |
| 303 | "b": ["a.py::a"], |
| 304 | } |
| 305 | result = transitive_callers("d", rev, max_depth=1) |
| 306 | assert 1 in result |
| 307 | assert 2 not in result |
| 308 | assert 3 not in result |
| 309 | |
| 310 | def test_diamond_dependency_no_duplicate_addresses(self) -> None: |
| 311 | rev: dict[str, list[str]] = { |
| 312 | "a": ["x.py::b", "x.py::c"], |
| 313 | "b": ["x.py::d"], |
| 314 | "c": ["x.py::d"], |
| 315 | } |
| 316 | result = transitive_callers("a", rev) |
| 317 | all_addrs: list[str] = [] |
| 318 | for addrs in result.values(): |
| 319 | all_addrs.extend(addrs) |
| 320 | # x.py::d should appear at most once (visited set prevents duplicates). |
| 321 | assert all_addrs.count("x.py::d") <= 1 |
| 322 | |
| 323 | def test_multiple_direct_callers_all_at_depth_1(self) -> None: |
| 324 | rev: dict[str, list[str]] = { |
| 325 | "target": ["a.py::f1", "a.py::f2", "a.py::f3"], |
| 326 | } |
| 327 | result = transitive_callers("target", rev) |
| 328 | assert 1 in result |
| 329 | assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"} |
| 330 | |
| 331 | def test_return_type_is_depth_to_list(self) -> None: |
| 332 | rev: dict[str, list[str]] = {"t": ["a.py::f"]} |
| 333 | result = transitive_callers("t", rev) |
| 334 | for depth, addrs in result.items(): |
| 335 | assert isinstance(depth, int) |
| 336 | assert isinstance(addrs, list) |