test_core_reflog.py
python
| 1 | """Tests for muse/core/reflog.py — reflog append, read, parse.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import datetime |
| 6 | import pathlib |
| 7 | |
| 8 | import pytest |
| 9 | |
| 10 | from muse.core.reflog import ( |
| 11 | ReflogEntry, |
| 12 | append_reflog, |
| 13 | list_reflog_refs, |
| 14 | read_reflog, |
| 15 | ) |
| 16 | |
| 17 | _NULL_ID = "0" * 64 |
| 18 | _SHA_A = "a" * 64 |
| 19 | _SHA_B = "b" * 64 |
| 20 | _SHA_C = "c" * 64 |
| 21 | |
| 22 | |
| 23 | # --------------------------------------------------------------------------- |
| 24 | # append_reflog |
| 25 | # --------------------------------------------------------------------------- |
| 26 | |
| 27 | |
| 28 | def test_append_creates_log_files(tmp_path: pathlib.Path) -> None: |
| 29 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice", operation="commit: init") |
| 30 | assert (tmp_path / ".muse" / "logs" / "refs" / "heads" / "main").exists() |
| 31 | assert (tmp_path / ".muse" / "logs" / "HEAD").exists() |
| 32 | |
| 33 | |
| 34 | def test_append_null_old_id(tmp_path: pathlib.Path) -> None: |
| 35 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice", operation="commit: init") |
| 36 | entries = read_reflog(tmp_path, "main") |
| 37 | assert len(entries) == 1 |
| 38 | assert entries[0].old_id == _NULL_ID |
| 39 | assert entries[0].new_id == _SHA_A |
| 40 | |
| 41 | |
| 42 | def test_append_multiple_entries(tmp_path: pathlib.Path) -> None: |
| 43 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: first") |
| 44 | append_reflog(tmp_path, "main", old_id=_SHA_A, new_id=_SHA_B, author="B", operation="commit: second") |
| 45 | append_reflog(tmp_path, "main", old_id=_SHA_B, new_id=_SHA_C, author="C", operation="commit: third") |
| 46 | entries = read_reflog(tmp_path, "main") |
| 47 | assert len(entries) == 3 |
| 48 | # Newest first. |
| 49 | assert entries[0].new_id == _SHA_C |
| 50 | assert entries[1].new_id == _SHA_B |
| 51 | assert entries[2].new_id == _SHA_A |
| 52 | |
| 53 | |
| 54 | def test_append_head_log_also_updated(tmp_path: pathlib.Path) -> None: |
| 55 | append_reflog(tmp_path, "dev", old_id=_SHA_A, new_id=_SHA_B, author="X", operation="checkout: moving") |
| 56 | head_entries = read_reflog(tmp_path, branch=None) |
| 57 | assert len(head_entries) == 1 |
| 58 | assert head_entries[0].new_id == _SHA_B |
| 59 | |
| 60 | |
| 61 | def test_append_operation_preserved(tmp_path: pathlib.Path) -> None: |
| 62 | op = "merge: feat/audio into main" |
| 63 | append_reflog(tmp_path, "main", old_id=_SHA_A, new_id=_SHA_B, author="Alice", operation=op) |
| 64 | entries = read_reflog(tmp_path, "main") |
| 65 | assert entries[0].operation == op |
| 66 | |
| 67 | |
| 68 | def test_append_author_preserved(tmp_path: pathlib.Path) -> None: |
| 69 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice <alice@example.com>", operation="commit: x") |
| 70 | entries = read_reflog(tmp_path, "main") |
| 71 | assert "Alice" in entries[0].author |
| 72 | |
| 73 | |
| 74 | # --------------------------------------------------------------------------- |
| 75 | # read_reflog |
| 76 | # --------------------------------------------------------------------------- |
| 77 | |
| 78 | |
| 79 | def test_read_returns_empty_for_missing_log(tmp_path: pathlib.Path) -> None: |
| 80 | entries = read_reflog(tmp_path, "nonexistent") |
| 81 | assert entries == [] |
| 82 | |
| 83 | |
| 84 | def test_read_limit(tmp_path: pathlib.Path) -> None: |
| 85 | for i in range(10): |
| 86 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation=f"commit: {i}") |
| 87 | entries = read_reflog(tmp_path, "main", limit=3) |
| 88 | assert len(entries) == 3 |
| 89 | |
| 90 | |
| 91 | def test_read_head_log(tmp_path: pathlib.Path) -> None: |
| 92 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 93 | entries = read_reflog(tmp_path, branch=None) |
| 94 | assert len(entries) == 1 |
| 95 | |
| 96 | |
| 97 | def test_read_timestamp_is_utc_datetime(tmp_path: pathlib.Path) -> None: |
| 98 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 99 | entries = read_reflog(tmp_path, "main") |
| 100 | assert isinstance(entries[0].timestamp, datetime.datetime) |
| 101 | assert entries[0].timestamp.tzinfo is not None |
| 102 | |
| 103 | |
| 104 | # --------------------------------------------------------------------------- |
| 105 | # list_reflog_refs |
| 106 | # --------------------------------------------------------------------------- |
| 107 | |
| 108 | |
| 109 | def test_list_reflog_refs_empty(tmp_path: pathlib.Path) -> None: |
| 110 | assert list_reflog_refs(tmp_path) == [] |
| 111 | |
| 112 | |
| 113 | def test_list_reflog_refs_returns_branch_names(tmp_path: pathlib.Path) -> None: |
| 114 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 115 | append_reflog(tmp_path, "dev", old_id=None, new_id=_SHA_B, author="B", operation="commit: y") |
| 116 | refs = list_reflog_refs(tmp_path) |
| 117 | assert "main" in refs |
| 118 | assert "dev" in refs |
| 119 | |
| 120 | |
| 121 | def test_list_reflog_refs_sorted(tmp_path: pathlib.Path) -> None: |
| 122 | for name in ("zzz", "aaa", "mmm"): |
| 123 | append_reflog(tmp_path, name, old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 124 | refs = list_reflog_refs(tmp_path) |
| 125 | assert refs == sorted(refs) |
| 126 | |
| 127 | |
| 128 | # --------------------------------------------------------------------------- |
| 129 | # Stress test: many entries |
| 130 | # --------------------------------------------------------------------------- |
| 131 | |
| 132 | |
| 133 | def test_stress_many_entries(tmp_path: pathlib.Path) -> None: |
| 134 | """500 entries must round-trip correctly.""" |
| 135 | n = 500 |
| 136 | for i in range(n): |
| 137 | sha = format(i, "064x") |
| 138 | append_reflog(tmp_path, "main", old_id=None, new_id=sha, author="A", operation=f"commit: {i}") |
| 139 | entries = read_reflog(tmp_path, "main", limit=n) |
| 140 | assert len(entries) == n |
| 141 | # Newest first — last appended sha should be entries[0]. |
| 142 | assert entries[0].new_id == format(n - 1, "064x") |
| 143 | |
| 144 | |
| 145 | # --------------------------------------------------------------------------- |
| 146 | # Edge cases |
| 147 | # --------------------------------------------------------------------------- |
| 148 | |
| 149 | |
| 150 | def test_entry_with_tab_in_operation(tmp_path: pathlib.Path) -> None: |
| 151 | """Tab characters in the operation string must be escaped/handled gracefully.""" |
| 152 | op = "commit: message with some text" |
| 153 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation=op) |
| 154 | entries = read_reflog(tmp_path, "main") |
| 155 | assert entries[0].operation == op |
| 156 | |
| 157 | |
| 158 | def test_multiple_branches_isolated(tmp_path: pathlib.Path) -> None: |
| 159 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: main") |
| 160 | append_reflog(tmp_path, "dev", old_id=None, new_id=_SHA_B, author="B", operation="commit: dev") |
| 161 | main_entries = read_reflog(tmp_path, "main") |
| 162 | dev_entries = read_reflog(tmp_path, "dev") |
| 163 | assert len(main_entries) == 1 |
| 164 | assert len(dev_entries) == 1 |
| 165 | assert main_entries[0].new_id == _SHA_A |
| 166 | assert dev_entries[0].new_id == _SHA_B |