g_counter.py
python
| 1 | """Grow-only Counter (G-Counter) — CRDT for monotonically increasing counts. |
| 2 | |
| 3 | A G-Counter assigns one slot per agent. Each agent may only increment its own |
| 4 | slot. The global value is the sum of all slots. ``join`` takes the per-slot |
| 5 | maximum (matching the Vector Clock pattern), which is commutative, associative, |
| 6 | and idempotent. |
| 7 | |
| 8 | Use cases in Muse: |
| 9 | - Commit-count metrics per agent (how many commits has each agent contributed?). |
| 10 | - Replay counters (how many times has this element been touched?). |
| 11 | - Any monotonically increasing quantity across a distributed agent fleet. |
| 12 | |
| 13 | **Lattice laws satisfied by** :meth:`join`: |
| 14 | 1. Commutativity: ``join(a, b) == join(b, a)`` |
| 15 | 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))`` |
| 16 | 3. Idempotency: ``join(a, a) == a`` |
| 17 | |
| 18 | Proof: ``join`` is ``max`` per slot and ``value`` is ``sum`` — both operations |
| 19 | on the non-negative integer lattice are trivially lattice-correct. |
| 20 | |
| 21 | Public API |
| 22 | ---------- |
| 23 | - :class:`GCounterDict` — ``TypedDict`` wire format ``{agent_id: count}``. |
| 24 | - :class:`GCounter` — the counter itself. |
| 25 | """ |
| 26 | |
| 27 | from __future__ import annotations |
| 28 | |
| 29 | import logging |
| 30 | from typing import TypedDict |
| 31 | |
| 32 | logger = logging.getLogger(__name__) |
| 33 | |
| 34 | |
| 35 | class GCounterDict(TypedDict, total=False): |
| 36 | """Wire format for a :class:`GCounter` — ``{agent_id: count}``. |
| 37 | |
| 38 | ``total=False`` because absent keys are equivalent to ``0``. Serialise |
| 39 | with :meth:`GCounter.to_dict` and deserialise with :meth:`GCounter.from_dict`. |
| 40 | """ |
| 41 | |
| 42 | |
| 43 | class GCounter: |
| 44 | """Grow-only Counter — a CRDT counter that only ever increases. |
| 45 | |
| 46 | Each agent increments its own private slot; the global value is the sum of |
| 47 | all slots. Only the owning agent may increment a slot (this is enforced by |
| 48 | convention — not cryptographically). |
| 49 | |
| 50 | All mutating methods return new :class:`GCounter` instances; ``self`` is |
| 51 | never modified. |
| 52 | |
| 53 | Example:: |
| 54 | |
| 55 | c1 = GCounter().increment("agent-1") |
| 56 | c2 = GCounter().increment("agent-2").increment("agent-2") |
| 57 | merged = c1.join(c2) |
| 58 | assert merged.value() == 3 # 1 from agent-1 + 2 from agent-2 |
| 59 | assert merged.join(c1).value() == 3 # idempotent |
| 60 | """ |
| 61 | |
| 62 | def __init__(self, counts: dict[str, int] | None = None) -> None: |
| 63 | """Construct a G-Counter, optionally pre-populated. |
| 64 | |
| 65 | Args: |
| 66 | counts: Initial ``{agent_id: count}`` mapping. Copied defensively. |
| 67 | All values must be non-negative integers. |
| 68 | """ |
| 69 | self._counts: dict[str, int] = dict(counts) if counts else {} |
| 70 | |
| 71 | # ------------------------------------------------------------------ |
| 72 | # Mutation (returns new GCounter) |
| 73 | # ------------------------------------------------------------------ |
| 74 | |
| 75 | def increment(self, agent_id: str, by: int = 1) -> GCounter: |
| 76 | """Return a new counter with *agent_id*'s slot incremented by *by*. |
| 77 | |
| 78 | Args: |
| 79 | agent_id: The agent performing the increment (must be the caller's |
| 80 | own agent ID to maintain CRDT invariants). |
| 81 | by: Amount to increment; must be a positive integer. |
| 82 | |
| 83 | Returns: |
| 84 | A new :class:`GCounter` with the updated slot. |
| 85 | |
| 86 | Raises: |
| 87 | ValueError: If *by* is not a positive integer. |
| 88 | """ |
| 89 | if by <= 0: |
| 90 | raise ValueError(f"GCounter.increment: 'by' must be positive, got {by}") |
| 91 | new_counts = dict(self._counts) |
| 92 | new_counts[agent_id] = new_counts.get(agent_id, 0) + by |
| 93 | return GCounter(new_counts) |
| 94 | |
| 95 | # ------------------------------------------------------------------ |
| 96 | # CRDT join |
| 97 | # ------------------------------------------------------------------ |
| 98 | |
| 99 | def join(self, other: GCounter) -> GCounter: |
| 100 | """Return the lattice join — per-slot maximum of ``self`` and *other*. |
| 101 | |
| 102 | Args: |
| 103 | other: The counter to merge with. |
| 104 | |
| 105 | Returns: |
| 106 | A new :class:`GCounter` holding the per-agent maximum counts. |
| 107 | """ |
| 108 | all_agents = set(self._counts) | set(other._counts) |
| 109 | merged = { |
| 110 | agent: max(self._counts.get(agent, 0), other._counts.get(agent, 0)) |
| 111 | for agent in all_agents |
| 112 | } |
| 113 | return GCounter(merged) |
| 114 | |
| 115 | # ------------------------------------------------------------------ |
| 116 | # Query |
| 117 | # ------------------------------------------------------------------ |
| 118 | |
| 119 | def value(self) -> int: |
| 120 | """Return the global counter value — the sum of all agent slots. |
| 121 | |
| 122 | Returns: |
| 123 | Non-negative integer. |
| 124 | """ |
| 125 | return sum(self._counts.values()) |
| 126 | |
| 127 | def value_for(self, agent_id: str) -> int: |
| 128 | """Return the count for a specific agent. |
| 129 | |
| 130 | Args: |
| 131 | agent_id: The agent to query. |
| 132 | |
| 133 | Returns: |
| 134 | The agent's slot value, or ``0`` if the agent has not incremented. |
| 135 | """ |
| 136 | return self._counts.get(agent_id, 0) |
| 137 | |
| 138 | # ------------------------------------------------------------------ |
| 139 | # Serialisation |
| 140 | # ------------------------------------------------------------------ |
| 141 | |
| 142 | def to_dict(self) -> dict[str, int]: |
| 143 | """Return a JSON-serialisable ``{agent_id: count}`` mapping. |
| 144 | |
| 145 | Returns: |
| 146 | A shallow copy of the internal counts dictionary. |
| 147 | """ |
| 148 | return dict(self._counts) |
| 149 | |
| 150 | @classmethod |
| 151 | def from_dict(cls, data: dict[str, int]) -> GCounter: |
| 152 | """Reconstruct a :class:`GCounter` from its wire representation. |
| 153 | |
| 154 | Args: |
| 155 | data: ``{agent_id: count}`` mapping as produced by :meth:`to_dict`. |
| 156 | |
| 157 | Returns: |
| 158 | A new :class:`GCounter`. |
| 159 | """ |
| 160 | return cls(data) |
| 161 | |
| 162 | # ------------------------------------------------------------------ |
| 163 | # Python dunder helpers |
| 164 | # ------------------------------------------------------------------ |
| 165 | |
| 166 | def equivalent(self, other: GCounter) -> bool: |
| 167 | """Return ``True`` if both counters hold identical per-agent counts. |
| 168 | |
| 169 | Args: |
| 170 | other: The G-Counter to compare against. |
| 171 | |
| 172 | Returns: |
| 173 | ``True`` when every agent slot has the same value in both counters |
| 174 | (treating absent agents as count 0). |
| 175 | """ |
| 176 | all_agents = set(self._counts) | set(other._counts) |
| 177 | return all( |
| 178 | self._counts.get(a, 0) == other._counts.get(a, 0) |
| 179 | for a in all_agents |
| 180 | ) |
| 181 | |
| 182 | def __repr__(self) -> str: |
| 183 | return f"GCounter(value={self.value()}, slots={self._counts!r})" |