lcs.py
python
| 1 | """LCS / Myers shortest-edit-script algorithm for ordered sequences. |
| 2 | |
| 3 | Operates on ``list[str]`` where each string is a content ID (SHA-256 or |
| 4 | deterministic hash). Two elements are considered identical iff their content |
| 5 | IDs are equal — the algorithm never inspects actual content. |
| 6 | |
| 7 | Public API |
| 8 | ---------- |
| 9 | - :func:`myers_ses` — compute shortest edit script (keep / insert / delete). |
| 10 | - :func:`detect_moves` — post-process insert+delete pairs into ``MoveOp``\\s. |
| 11 | - :func:`diff` — end-to-end: list[str] × list[str] → ``StructuredDelta``. |
| 12 | |
| 13 | Algorithm |
| 14 | --------- |
| 15 | ``myers_ses`` uses the classic O(nm) LCS dynamic-programming traceback. This |
| 16 | is the same algorithm as ``midi_diff.lcs_edit_script`` but operates on content |
| 17 | IDs (strings) rather than ``NoteKey`` dicts, making it fully generic. |
| 18 | |
| 19 | The patience-diff and O(nd) Myers variants (see ``SequenceSchema.diff_algorithm``) |
| 20 | are not yet implemented; both fall back to the O(nm) LCS. |
| 21 | as an optimisation without changing the public API. |
| 22 | """ |
| 23 | |
| 24 | import logging |
| 25 | from dataclasses import dataclass |
| 26 | from typing import Literal |
| 27 | |
| 28 | from muse.core.schema import SequenceSchema |
| 29 | from muse.domain import DeleteOp, DomainOp, InsertOp, MoveOp, StructuredDelta |
| 30 | |
| 31 | logger = logging.getLogger(__name__) |
| 32 | |
| 33 | EditKind = Literal["keep", "insert", "delete"] |
| 34 | |
| 35 | |
| 36 | @dataclass(frozen=True) |
| 37 | class EditStep: |
| 38 | """One step in the shortest edit script produced by :func:`myers_ses`.""" |
| 39 | |
| 40 | kind: EditKind |
| 41 | base_index: int # index in the base content-ID list |
| 42 | target_index: int # index in the target content-ID list |
| 43 | item: str # content ID of the element |
| 44 | |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Core algorithm |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | |
| 51 | def myers_ses(base: list[str], target: list[str]) -> list[EditStep]: |
| 52 | """Compute the shortest edit script transforming *base* into *target*. |
| 53 | |
| 54 | Uses the O(nm) LCS dynamic-programming table followed by a linear-time |
| 55 | traceback. Two elements are equal iff their content IDs match. |
| 56 | |
| 57 | Args: |
| 58 | base: Ordered list of content IDs for the base sequence. |
| 59 | target: Ordered list of content IDs for the target sequence. |
| 60 | |
| 61 | Returns: |
| 62 | A list of :class:`EditStep` entries (keep / insert / delete) that |
| 63 | transforms *base* into *target*. The number of "keep" steps equals |
| 64 | the LCS length; insert + delete steps are minimal. |
| 65 | """ |
| 66 | n, m = len(base), len(target) |
| 67 | |
| 68 | # dp[i][j] = length of LCS of base[i:] and target[j:] |
| 69 | dp: list[list[int]] = [[0] * (m + 1) for _ in range(n + 1)] |
| 70 | for i in range(n - 1, -1, -1): |
| 71 | for j in range(m - 1, -1, -1): |
| 72 | if base[i] == target[j]: |
| 73 | dp[i][j] = dp[i + 1][j + 1] + 1 |
| 74 | else: |
| 75 | dp[i][j] = max(dp[i + 1][j], dp[i][j + 1]) |
| 76 | |
| 77 | steps: list[EditStep] = [] |
| 78 | i, j = 0, 0 |
| 79 | while i < n or j < m: |
| 80 | if i < n and j < m and base[i] == target[j]: |
| 81 | steps.append(EditStep("keep", i, j, base[i])) |
| 82 | i += 1 |
| 83 | j += 1 |
| 84 | elif j < m and (i >= n or dp[i][j + 1] >= dp[i + 1][j]): |
| 85 | steps.append(EditStep("insert", i, j, target[j])) |
| 86 | j += 1 |
| 87 | else: |
| 88 | steps.append(EditStep("delete", i, j, base[i])) |
| 89 | i += 1 |
| 90 | |
| 91 | return steps |
| 92 | |
| 93 | |
| 94 | # --------------------------------------------------------------------------- |
| 95 | # Move detection post-pass |
| 96 | # --------------------------------------------------------------------------- |
| 97 | |
| 98 | |
| 99 | def detect_moves( |
| 100 | inserts: list[InsertOp], |
| 101 | deletes: list[DeleteOp], |
| 102 | ) -> tuple[list[MoveOp], list[InsertOp], list[DeleteOp]]: |
| 103 | """Collapse (delete, insert) pairs that share a content ID into ``MoveOp``\\s. |
| 104 | |
| 105 | A move is defined as a delete and an insert of the same content (same |
| 106 | ``content_id``) at different positions. Where the positions are the same, |
| 107 | the pair is left as separate insert/delete ops (idempotent round-trip). |
| 108 | |
| 109 | Args: |
| 110 | inserts: ``InsertOp`` entries from the LCS edit script. |
| 111 | deletes: ``DeleteOp`` entries from the LCS edit script. |
| 112 | |
| 113 | Returns: |
| 114 | A tuple ``(moves, remaining_inserts, remaining_deletes)`` where |
| 115 | ``moves`` contains the detected ``MoveOp``\\s and the remaining lists |
| 116 | hold ops that could not be paired. |
| 117 | """ |
| 118 | delete_by_content: dict[str, DeleteOp] = {} |
| 119 | for d in deletes: |
| 120 | # Keep the first delete for each content_id — later ones are true deletes. |
| 121 | if d["content_id"] not in delete_by_content: |
| 122 | delete_by_content[d["content_id"]] = d |
| 123 | |
| 124 | moves: list[MoveOp] = [] |
| 125 | remaining_inserts: list[InsertOp] = [] |
| 126 | consumed: set[str] = set() |
| 127 | |
| 128 | for ins in inserts: |
| 129 | cid = ins["content_id"] |
| 130 | if cid in delete_by_content and cid not in consumed: |
| 131 | d = delete_by_content[cid] |
| 132 | from_pos = d["position"] if d["position"] is not None else 0 |
| 133 | to_pos = ins["position"] if ins["position"] is not None else 0 |
| 134 | if from_pos != to_pos: |
| 135 | moves.append( |
| 136 | MoveOp( |
| 137 | op="move", |
| 138 | address=ins["address"], |
| 139 | from_position=from_pos, |
| 140 | to_position=to_pos, |
| 141 | content_id=cid, |
| 142 | ) |
| 143 | ) |
| 144 | consumed.add(cid) |
| 145 | continue |
| 146 | remaining_inserts.append(ins) |
| 147 | |
| 148 | remaining_deletes = [d for d in deletes if d["content_id"] not in consumed] |
| 149 | return moves, remaining_inserts, remaining_deletes |
| 150 | |
| 151 | |
| 152 | # --------------------------------------------------------------------------- |
| 153 | # Top-level diff entry point |
| 154 | # --------------------------------------------------------------------------- |
| 155 | |
| 156 | |
| 157 | def diff( |
| 158 | schema: SequenceSchema, |
| 159 | base: list[str], |
| 160 | target: list[str], |
| 161 | *, |
| 162 | domain: str, |
| 163 | address: str = "", |
| 164 | ) -> StructuredDelta: |
| 165 | """Diff two ordered sequences of content IDs, returning a ``StructuredDelta``. |
| 166 | |
| 167 | Runs :func:`myers_ses`, then :func:`detect_moves` to collapse paired |
| 168 | insert/delete entries into ``MoveOp``\\s. The resulting ``ops`` list |
| 169 | contains ``DeleteOp``, ``InsertOp``, and ``MoveOp`` entries. |
| 170 | |
| 171 | Args: |
| 172 | schema: The ``SequenceSchema`` declaring element type and identity. |
| 173 | base: Base (ancestor) sequence as a list of content IDs. |
| 174 | target: Target (newer) sequence as a list of content IDs. |
| 175 | domain: Domain tag for the returned ``StructuredDelta``. |
| 176 | address: Address prefix for generated op entries (e.g. file path). |
| 177 | |
| 178 | Returns: |
| 179 | A ``StructuredDelta`` with a human-readable ``summary`` and typed ops. |
| 180 | """ |
| 181 | steps = myers_ses(base, target) |
| 182 | |
| 183 | raw_inserts: list[InsertOp] = [] |
| 184 | raw_deletes: list[DeleteOp] = [] |
| 185 | elem = schema["element_type"] |
| 186 | |
| 187 | for step in steps: |
| 188 | if step.kind == "insert": |
| 189 | raw_inserts.append( |
| 190 | InsertOp( |
| 191 | op="insert", |
| 192 | address=address, |
| 193 | position=step.target_index, |
| 194 | content_id=step.item, |
| 195 | content_summary=f"{elem}:{step.item[:8]}", |
| 196 | ) |
| 197 | ) |
| 198 | elif step.kind == "delete": |
| 199 | raw_deletes.append( |
| 200 | DeleteOp( |
| 201 | op="delete", |
| 202 | address=address, |
| 203 | position=step.base_index, |
| 204 | content_id=step.item, |
| 205 | content_summary=f"{elem}:{step.item[:8]}", |
| 206 | ) |
| 207 | ) |
| 208 | |
| 209 | moves, remaining_inserts, remaining_deletes = detect_moves(raw_inserts, raw_deletes) |
| 210 | ops: list[DomainOp] = [*remaining_deletes, *remaining_inserts, *moves] |
| 211 | |
| 212 | n_ins = len(remaining_inserts) |
| 213 | n_del = len(remaining_deletes) |
| 214 | n_mov = len(moves) |
| 215 | |
| 216 | parts: list[str] = [] |
| 217 | if n_ins: |
| 218 | parts.append(f"{n_ins} {elem}{'s' if n_ins != 1 else ''} added") |
| 219 | if n_del: |
| 220 | parts.append(f"{n_del} {elem}{'s' if n_del != 1 else ''} removed") |
| 221 | if n_mov: |
| 222 | parts.append(f"{n_mov} {'moved' if n_mov != 1 else 'moved'}") |
| 223 | summary = ", ".join(parts) if parts else f"no {elem} changes" |
| 224 | |
| 225 | logger.debug( |
| 226 | "lcs.diff %r: +%d -%d ~%d ops on %d→%d elements", |
| 227 | address, |
| 228 | n_ins, |
| 229 | n_del, |
| 230 | n_mov, |
| 231 | len(base), |
| 232 | len(target), |
| 233 | ) |
| 234 | |
| 235 | return StructuredDelta(domain=domain, ops=ops, summary=summary) |