cgcardona / muse public
identity.py python
407 lines 14.5 KB
80353726 feat: muse auth + hub + config — paradigm-level identity architecture w… Gabriel Cardona <cgcardona@gmail.com> 9h ago
1 """Global identity store — ``~/.muse/identity.toml``.
2
3 Credentials (bearer tokens) are kept here, separate from per-repository
4 configuration. This means tokens are never accidentally committed to
5 version control, and a single identity can authenticate across all
6 repositories on the same hub.
7
8 Why global, not per-repo
9 -------------------------
10 Git hides tokens in ``~/.netrc`` or the credential helper chain — an
11 afterthought. Muse makes identity a first-class, machine-scoped concept.
12 The repository knows *where* the hub is (``[hub] url`` in config.toml).
13 The machine knows *who you are* (this file). The two concerns are
14 deliberately separated.
15
16 Identity types
17 --------------
18 ``type = "human"``
19 A person. Authenticated via OAuth or a personal access token. No
20 explicit capability list — the hub governs what humans can do via roles.
21
22 ``type = "agent"``
23 An autonomous process. Authenticated via a scoped capability token.
24 The ``capabilities`` field in this file reflects what the token allows,
25 enabling agents to self-inspect before attempting an operation.
26
27 File format
28 -----------
29 TOML with one section per hub hostname::
30
31 ["musehub.ai"]
32 type = "human"
33 name = "Alice"
34 id = "usr_abc123"
35 token = "eyJ..." # bearer token — NEVER logged
36
37 ["staging.musehub.ai"]
38 type = "agent"
39 name = "composer-v2"
40 id = "agt_def456"
41 token = "eyJ..."
42 capabilities = ["read:*", "write:midi", "commit"]
43
44 Security model
45 --------------
46 - ``~/.muse/`` is created with mode 0o700 (user-only directory).
47 - ``~/.muse/identity.toml`` is written with mode 0o600 **from the first
48 byte** — using ``os.open()`` + ``os.fchmod()`` before any data is written,
49 eliminating the TOCTOU window that ``write_text()`` + ``chmod()`` creates.
50 - Writes are atomic: data goes to a temp file in the same directory, then
51 ``os.replace()`` renames it over the target. A kill signal during write
52 leaves the old file intact, never a partial file.
53 - Symlink guard: if the target path is already a symlink, write is refused.
54 This blocks symlink-based credential-overwrite attacks.
55 - All log calls that reference a token mask it as ``"Bearer ***"``.
56 - The file is never read or written as part of a repository snapshot.
57 """
58
59 from __future__ import annotations
60
61 import contextlib
62 import fcntl
63 import logging
64 import os
65 import pathlib
66 import stat
67 import tempfile
68 import tomllib
69 from collections.abc import Generator
70 from typing import TypedDict
71
72 logger = logging.getLogger(__name__)
73
74 _IDENTITY_DIR = pathlib.Path.home() / ".muse"
75 _IDENTITY_FILE = _IDENTITY_DIR / "identity.toml"
76
77
78 # ---------------------------------------------------------------------------
79 # Types
80 # ---------------------------------------------------------------------------
81
82
83 class IdentityEntry(TypedDict, total=False):
84 """One authenticated identity, keyed by hub hostname in identity.toml."""
85
86 type: str # "human" | "agent"
87 name: str # display name
88 id: str # hub-assigned identity ID
89 token: str # bearer token — never logged
90 capabilities: list[str] # agent capability strings (empty for humans)
91
92
93 # ---------------------------------------------------------------------------
94 # Path helper
95 # ---------------------------------------------------------------------------
96
97
98 def get_identity_path() -> pathlib.Path:
99 """Return the path to the global identity file (``~/.muse/identity.toml``)."""
100 return _IDENTITY_FILE
101
102
103 # ---------------------------------------------------------------------------
104 # URL → hostname normalisation
105 # ---------------------------------------------------------------------------
106
107
108 def _hostname_from_url(url: str) -> str:
109 """Normalise *url* to a lowercase hostname suitable for use as a dict key.
110
111 Security properties
112 -------------------
113 - Strips the scheme (``https://``), so different scheme representations of
114 the same host resolve to the same key.
115 - Strips userinfo (``user:password@``) — embedded credentials in a URL are
116 never stored as part of the hostname key.
117 - Normalises to lowercase — DNS is case-insensitive, so ``MUSEHUB.AI``
118 and ``musehub.ai`` are the same host and must resolve to the same entry.
119
120 Examples::
121
122 "https://musehub.ai/repos/x" → "musehub.ai"
123 "https://admin:s3cr3t@musehub.ai" → "musehub.ai"
124 "MUSEHUB.AI" → "musehub.ai"
125 "https://musehub.ai" → "musehub.ai"
126 "musehub.ai:8443" → "musehub.ai:8443"
127 """
128 stripped = url.strip().rstrip("/")
129 # Remove scheme.
130 if "://" in stripped:
131 stripped = stripped.split("://", 1)[1]
132 # Remove userinfo (user:password@) — never embed credentials in the key.
133 if "@" in stripped:
134 stripped = stripped.rsplit("@", 1)[1]
135 # Keep only host[:port], strip any path.
136 hostname = stripped.split("/")[0]
137 # Normalise to lowercase — DNS is case-insensitive.
138 return hostname.lower()
139
140
141 # ---------------------------------------------------------------------------
142 # TOML serialiser (write-side — stdlib tomllib is read-only)
143 # ---------------------------------------------------------------------------
144
145
146 def _toml_escape(value: str) -> str:
147 """Escape a string value for embedding in a TOML double-quoted string."""
148 return value.replace("\\", "\\\\").replace('"', '\\"')
149
150
151 def _dump_identity(identities: dict[str, IdentityEntry]) -> str:
152 """Serialise a hostname → entry mapping to TOML text.
153
154 All hostnames are quoted in the section header so that dotted names
155 (e.g. ``musehub.ai``) are treated as literal keys, not nested tables.
156 All string values are TOML-escaped to prevent injection.
157 """
158 lines: list[str] = []
159 for hostname in sorted(identities):
160 entry = identities[hostname]
161 # Always quote the section key — dotted names are literal, not nested.
162 lines.append(f'["{_toml_escape(hostname)}"]')
163 t = entry.get("type", "")
164 if t:
165 lines.append(f'type = "{_toml_escape(t)}"')
166 name = entry.get("name", "")
167 if name:
168 lines.append(f'name = "{_toml_escape(name)}"')
169 identity_id = entry.get("id", "")
170 if identity_id:
171 lines.append(f'id = "{_toml_escape(identity_id)}"')
172 token = entry.get("token", "")
173 if token:
174 lines.append(f'token = "{_toml_escape(token)}"')
175 caps = entry.get("capabilities") or []
176 if caps:
177 # Each capability string is individually escaped.
178 caps_str = ", ".join(f'"{_toml_escape(c)}"' for c in caps)
179 lines.append(f"capabilities = [{caps_str}]")
180 lines.append("")
181 return "\n".join(lines)
182
183
184 # ---------------------------------------------------------------------------
185 # Load / save
186 # ---------------------------------------------------------------------------
187
188
189 def _load_all(path: pathlib.Path) -> dict[str, IdentityEntry]:
190 """Load all identity entries from *path*. Returns empty dict if absent."""
191 if not path.is_file():
192 return {}
193 try:
194 with path.open("rb") as fh:
195 raw = tomllib.load(fh)
196 except Exception as exc: # noqa: BLE001
197 # Log only the exception *type*, never its message — a TOML parse
198 # error surfaced by tomllib includes the offending line, which can
199 # contain a fragment of the token being written when the file is corrupt.
200 logger.warning(
201 "⚠️ Failed to parse identity file %s (%s — run `muse auth login` to re-authenticate)",
202 path,
203 type(exc).__name__,
204 )
205 return {}
206
207 result: dict[str, IdentityEntry] = {}
208 for hostname, raw_entry in raw.items():
209 if not isinstance(raw_entry, dict):
210 continue
211 entry: IdentityEntry = {}
212 t = raw_entry.get("type")
213 if isinstance(t, str):
214 entry["type"] = t
215 n = raw_entry.get("name")
216 if isinstance(n, str):
217 entry["name"] = n
218 i = raw_entry.get("id")
219 if isinstance(i, str):
220 entry["id"] = i
221 tok = raw_entry.get("token")
222 if isinstance(tok, str):
223 entry["token"] = tok
224 caps = raw_entry.get("capabilities")
225 if isinstance(caps, list):
226 entry["capabilities"] = [str(c) for c in caps if isinstance(c, str)]
227 result[hostname] = entry
228
229 return result
230
231
232 @contextlib.contextmanager
233 def _identity_write_lock() -> Generator[None, None, None]:
234 """Acquire an exclusive advisory write-lock on the identity store.
235
236 Uses a dedicated lock file (``~/.muse/.identity.lock``) so that the lock
237 survives the atomic rename of ``identity.toml`` itself.
238
239 Advisory (cooperative) locking protects all Muse processes that use this
240 lock against concurrent read-modify-write races. Direct file edits by
241 external tools bypass the lock — that is acceptable; the user is then
242 responsible for data consistency.
243
244 POSIX-only (``fcntl.flock``). The lock is blocking with no timeout;
245 CLI commands are short-lived and lock contention is expected to be brief.
246 """
247 lock_path = _IDENTITY_DIR / ".identity.lock"
248 _IDENTITY_DIR.mkdir(parents=True, exist_ok=True)
249 # Create the lock file with owner-only permissions; O_CLOEXEC prevents
250 # child processes from inheriting the file descriptor.
251 lock_fd = os.open(
252 str(lock_path),
253 os.O_CREAT | os.O_WRONLY | os.O_CLOEXEC,
254 stat.S_IRUSR | stat.S_IWUSR,
255 )
256 try:
257 fcntl.flock(lock_fd, fcntl.LOCK_EX)
258 try:
259 yield
260 finally:
261 fcntl.flock(lock_fd, fcntl.LOCK_UN)
262 finally:
263 os.close(lock_fd)
264
265
266 def _save_all(identities: dict[str, IdentityEntry], path: pathlib.Path) -> None:
267 """Write *identities* to *path* securely.
268
269 Security guarantees
270 -------------------
271 1. **Symlink guard** — refuses to write if *path* is already a symlink,
272 preventing an attacker from pre-placing a symlink to a file they want
273 overwritten.
274 2. **0o700 directory** — ``~/.muse/`` is restricted to the owner so other
275 local users cannot list or traverse it.
276 3. **0o600 from byte zero** — the temp file is ``fchmod``-ed to 0o600
277 *before* any data is written, eliminating the TOCTOU window that
278 ``write_text()`` + ``chmod()`` creates.
279 4. **Atomic rename** — ``os.replace()`` swaps the temp file over the
280 target atomically; a kill signal during write leaves the old file intact.
281 """
282 dir_path = path.parent
283
284 # 1. Create ~/.muse/ with owner-only permissions (0o700).
285 dir_path.mkdir(parents=True, exist_ok=True)
286 try:
287 os.chmod(dir_path, stat.S_IRWXU) # 0o700
288 except OSError as exc:
289 logger.warning("⚠️ Could not set permissions on %s: %s", dir_path, exc)
290
291 # 2. Symlink guard — never follow a symlink placed at the target path.
292 if path.is_symlink():
293 raise OSError(
294 f"Security: {path} is a symlink. "
295 "Refusing to write credentials to a symlink target."
296 )
297
298 text = _dump_identity(identities)
299
300 # 3. Write to a temp file in the same directory (same fs → atomic rename).
301 # Set 0o600 via fchmod *before* writing any data.
302 fd, tmp_path_str = tempfile.mkstemp(dir=dir_path, prefix=".identity-tmp-")
303 tmp_path = pathlib.Path(tmp_path_str)
304 try:
305 os.fchmod(fd, stat.S_IRUSR | stat.S_IWUSR) # 0o600 before any data
306 with os.fdopen(fd, "w", encoding="utf-8") as fh:
307 fh.write(text)
308 # 4. Atomic rename — old file stays intact if we crash before this.
309 os.replace(tmp_path, path)
310 except Exception:
311 try:
312 tmp_path.unlink(missing_ok=True)
313 except OSError:
314 pass
315 raise
316
317
318 # ---------------------------------------------------------------------------
319 # Public API
320 # ---------------------------------------------------------------------------
321
322
323 def load_identity(hub_url: str) -> IdentityEntry | None:
324 """Return the stored identity for *hub_url*, or ``None`` if absent.
325
326 The URL is normalised to a hostname before lookup, so
327 ``https://musehub.ai/repos/x`` and ``musehub.ai`` resolve to the same
328 entry.
329
330 Args:
331 hub_url: Hub URL or bare hostname.
332
333 Returns:
334 :class:`IdentityEntry` if an identity is stored, else ``None``.
335 """
336 hostname = _hostname_from_url(hub_url)
337 return _load_all(_IDENTITY_FILE).get(hostname)
338
339
340 def save_identity(hub_url: str, entry: IdentityEntry) -> None:
341 """Store *entry* as the identity for *hub_url*.
342
343 The entire read-modify-write cycle is wrapped in an exclusive advisory
344 lock so that concurrent ``muse auth login`` calls (e.g. from parallel
345 agents) cannot race and overwrite each other's entries.
346
347 Creates ``~/.muse/identity.toml`` with mode 0o600 if it does not exist.
348
349 Args:
350 hub_url: Hub URL or bare hostname.
351 entry: Identity data to store.
352 """
353 hostname = _hostname_from_url(hub_url)
354 with _identity_write_lock():
355 identities = _load_all(_IDENTITY_FILE)
356 identities[hostname] = entry
357 _save_all(identities, _IDENTITY_FILE)
358 logger.info("✅ Identity for %s saved (Bearer ***)", hostname)
359
360
361 def clear_identity(hub_url: str) -> bool:
362 """Remove the stored identity for *hub_url*.
363
364 The entire read-modify-write cycle is wrapped in an exclusive advisory
365 lock (see :func:`save_identity`).
366
367 Args:
368 hub_url: Hub URL or bare hostname.
369
370 Returns:
371 ``True`` if an entry was removed, ``False`` if no entry existed.
372 """
373 hostname = _hostname_from_url(hub_url)
374 with _identity_write_lock():
375 identities = _load_all(_IDENTITY_FILE)
376 if hostname not in identities:
377 return False
378 del identities[hostname]
379 _save_all(identities, _IDENTITY_FILE)
380 logger.info("✅ Identity for %s cleared", hostname)
381 return True
382
383
384 def resolve_token(hub_url: str) -> str | None:
385 """Return the bearer token for *hub_url*, or ``None``.
386
387 The token is NEVER logged by this function.
388
389 Args:
390 hub_url: Hub URL or bare hostname.
391
392 Returns:
393 Token string if present and non-empty, else ``None``.
394 """
395 entry = load_identity(hub_url)
396 if entry is None:
397 return None
398 tok = entry.get("token", "")
399 return tok.strip() if tok.strip() else None
400
401
402 def list_all_identities() -> dict[str, IdentityEntry]:
403 """Return all stored identities keyed by hub hostname.
404
405 Returns an empty dict if the identity file does not exist.
406 """
407 return _load_all(_IDENTITY_FILE)