test_op_log.py
python
| 1 | """Tests for muse.core.op_log — OpEntry, OpLogCheckpoint, OpLog.""" |
| 2 | from __future__ import annotations |
| 3 | |
| 4 | import pathlib |
| 5 | |
| 6 | import pytest |
| 7 | |
| 8 | from muse.core.op_log import ( |
| 9 | OpEntry, |
| 10 | OpLog, |
| 11 | list_sessions, |
| 12 | make_op_entry, |
| 13 | ) |
| 14 | from muse.domain import InsertOp |
| 15 | |
| 16 | |
| 17 | # --------------------------------------------------------------------------- |
| 18 | # make_op_entry factory |
| 19 | # --------------------------------------------------------------------------- |
| 20 | |
| 21 | |
| 22 | class TestMakeOpEntry: |
| 23 | def test_all_required_fields_present(self) -> None: |
| 24 | op = InsertOp( |
| 25 | op="insert", |
| 26 | address="note:0", |
| 27 | position=0, |
| 28 | content_id="abc123", |
| 29 | content_summary="C4", |
| 30 | ) |
| 31 | entry = make_op_entry( |
| 32 | actor_id="agent-x", |
| 33 | domain="music", |
| 34 | domain_op=op, |
| 35 | lamport_ts=1, |
| 36 | ) |
| 37 | assert entry["actor_id"] == "agent-x" |
| 38 | assert entry["domain"] == "music" |
| 39 | assert entry["lamport_ts"] == 1 |
| 40 | assert entry["parent_op_ids"] == [] |
| 41 | assert entry["intent_id"] == "" |
| 42 | assert entry["reservation_id"] == "" |
| 43 | assert len(entry["op_id"]) == 36 # UUID4 |
| 44 | |
| 45 | def test_parent_op_ids_are_copied(self) -> None: |
| 46 | op = InsertOp(op="insert", address="note:0", position=0, content_id="x", content_summary="") |
| 47 | parent_ids = ["aaa", "bbb"] |
| 48 | entry = make_op_entry("a", "music", op, 1, parent_op_ids=parent_ids) |
| 49 | assert entry["parent_op_ids"] == ["aaa", "bbb"] |
| 50 | # Mutating the original should not affect the entry. |
| 51 | parent_ids.append("ccc") |
| 52 | assert entry["parent_op_ids"] == ["aaa", "bbb"] |
| 53 | |
| 54 | def test_op_ids_are_unique(self) -> None: |
| 55 | op = InsertOp(op="insert", address="note:0", position=0, content_id="x", content_summary="") |
| 56 | ids = {make_op_entry("a", "music", op, i)["op_id"] for i in range(20)} |
| 57 | assert len(ids) == 20 |
| 58 | |
| 59 | |
| 60 | # --------------------------------------------------------------------------- |
| 61 | # OpLog.append and read_all |
| 62 | # --------------------------------------------------------------------------- |
| 63 | |
| 64 | |
| 65 | class TestOpLogAppendRead: |
| 66 | def test_append_and_read_all_roundtrip(self, tmp_path: pathlib.Path) -> None: |
| 67 | log = OpLog(tmp_path, "session-1") |
| 68 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c1", content_summary="C4") |
| 69 | e1 = make_op_entry("agent-a", "music", op, 1) |
| 70 | e2 = make_op_entry("agent-a", "music", op, 2) |
| 71 | log.append(e1) |
| 72 | log.append(e2) |
| 73 | entries = log.read_all() |
| 74 | assert len(entries) == 2 |
| 75 | assert entries[0]["op_id"] == e1["op_id"] |
| 76 | assert entries[1]["op_id"] == e2["op_id"] |
| 77 | |
| 78 | def test_empty_log_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 79 | log = OpLog(tmp_path, "empty-session") |
| 80 | assert log.read_all() == [] |
| 81 | |
| 82 | def test_append_creates_directory(self, tmp_path: pathlib.Path) -> None: |
| 83 | log = OpLog(tmp_path, "new-session") |
| 84 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c1", content_summary="") |
| 85 | log.append(make_op_entry("a", "music", op, 1)) |
| 86 | assert (tmp_path / ".muse" / "op_log" / "new-session").is_dir() |
| 87 | |
| 88 | |
| 89 | # --------------------------------------------------------------------------- |
| 90 | # Lamport timestamp counter |
| 91 | # --------------------------------------------------------------------------- |
| 92 | |
| 93 | |
| 94 | class TestLamportTs: |
| 95 | def test_lamport_is_monotonic(self, tmp_path: pathlib.Path) -> None: |
| 96 | log = OpLog(tmp_path, "ts-session") |
| 97 | ts_values = [log.next_lamport_ts() for _ in range(10)] |
| 98 | assert ts_values == sorted(ts_values) |
| 99 | assert len(set(ts_values)) == 10 |
| 100 | |
| 101 | def test_lamport_continues_after_reopen(self, tmp_path: pathlib.Path) -> None: |
| 102 | log1 = OpLog(tmp_path, "reopen-session") |
| 103 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 104 | for i in range(5): |
| 105 | ts = log1.next_lamport_ts() |
| 106 | log1.append(make_op_entry("a", "music", op, ts)) |
| 107 | |
| 108 | # Reopen the same session. |
| 109 | log2 = OpLog(tmp_path, "reopen-session") |
| 110 | new_ts = log2.next_lamport_ts() |
| 111 | assert new_ts > 5 |
| 112 | |
| 113 | |
| 114 | # --------------------------------------------------------------------------- |
| 115 | # Checkpoint |
| 116 | # --------------------------------------------------------------------------- |
| 117 | |
| 118 | |
| 119 | class TestCheckpoint: |
| 120 | def test_checkpoint_written_and_readable(self, tmp_path: pathlib.Path) -> None: |
| 121 | log = OpLog(tmp_path, "ckpt-session") |
| 122 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 123 | for i in range(3): |
| 124 | log.append(make_op_entry("a", "music", op, i + 1)) |
| 125 | |
| 126 | ckpt = log.checkpoint("snap-abc") |
| 127 | assert ckpt["snapshot_id"] == "snap-abc" |
| 128 | assert ckpt["op_count"] == 3 |
| 129 | assert ckpt["lamport_ts"] == 3 |
| 130 | |
| 131 | recovered = log.read_checkpoint() |
| 132 | assert recovered is not None |
| 133 | assert recovered["snapshot_id"] == "snap-abc" |
| 134 | |
| 135 | def test_no_checkpoint_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 136 | log = OpLog(tmp_path, "no-ckpt-session") |
| 137 | assert log.read_checkpoint() is None |
| 138 | |
| 139 | def test_replay_since_checkpoint_returns_newer_only(self, tmp_path: pathlib.Path) -> None: |
| 140 | log = OpLog(tmp_path, "replay-session") |
| 141 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 142 | |
| 143 | for i in range(3): |
| 144 | log.append(make_op_entry("a", "music", op, i + 1)) |
| 145 | log.checkpoint("snap-1") |
| 146 | |
| 147 | # Add more entries after checkpoint. |
| 148 | for i in range(3, 6): |
| 149 | log.append(make_op_entry("a", "music", op, i + 1)) |
| 150 | |
| 151 | entries = log.replay_since_checkpoint() |
| 152 | assert len(entries) == 3 |
| 153 | assert all(e["lamport_ts"] > 3 for e in entries) |
| 154 | |
| 155 | |
| 156 | # --------------------------------------------------------------------------- |
| 157 | # to_structured_delta |
| 158 | # --------------------------------------------------------------------------- |
| 159 | |
| 160 | |
| 161 | class TestToStructuredDelta: |
| 162 | def test_produces_correct_domain_ops_filtered_by_domain(self, tmp_path: pathlib.Path) -> None: |
| 163 | log = OpLog(tmp_path, "delta-session") |
| 164 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="C4") |
| 165 | |
| 166 | for i in range(4): |
| 167 | log.append(make_op_entry("a", "music", op, i + 1)) |
| 168 | # Add one code op that should be filtered out. |
| 169 | code_op = InsertOp(op="insert", address="sym:0", position=0, content_id="d", content_summary="f()") |
| 170 | log.append(make_op_entry("a", "code", code_op, 5)) |
| 171 | |
| 172 | delta = log.to_structured_delta("music") |
| 173 | assert delta["domain"] == "midi_notes_tracked" or delta["domain"] == "music" |
| 174 | # Only the 4 music ops should be included. |
| 175 | assert len(delta["ops"]) == 4 |
| 176 | |
| 177 | def test_summary_mentions_insert(self, tmp_path: pathlib.Path) -> None: |
| 178 | log = OpLog(tmp_path, "summary-session") |
| 179 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="C4") |
| 180 | log.append(make_op_entry("a", "music", op, 1)) |
| 181 | delta = log.to_structured_delta("music") |
| 182 | assert "insert" in delta["summary"] |
| 183 | |
| 184 | |
| 185 | # --------------------------------------------------------------------------- |
| 186 | # Session listing |
| 187 | # --------------------------------------------------------------------------- |
| 188 | |
| 189 | |
| 190 | class TestListSessions: |
| 191 | def test_lists_all_sessions(self, tmp_path: pathlib.Path) -> None: |
| 192 | op = InsertOp(op="insert", address="note:0", position=0, content_id="c", content_summary="") |
| 193 | for sid in ["alpha", "beta", "gamma"]: |
| 194 | log = OpLog(tmp_path, sid) |
| 195 | log.append(make_op_entry("a", "music", op, 1)) |
| 196 | |
| 197 | sessions = list_sessions(tmp_path) |
| 198 | assert "alpha" in sessions |
| 199 | assert "beta" in sessions |
| 200 | assert "gamma" in sessions |
| 201 | |
| 202 | def test_empty_repo_returns_empty_list(self, tmp_path: pathlib.Path) -> None: |
| 203 | assert list_sessions(tmp_path) == [] |