From a4fe3a2b80dff43bef7a73012b7b6fc1a0065a48 Mon Sep 17 00:00:00 2001 From: Jan Lerking Date: Fri, 3 Jul 2026 08:51:39 +0200 Subject: [PATCH] Gated signatures. /JL --- README.md | 1 + requirements.txt | 1 + setup.py | 2 +- xtendr/signing.py | 259 +++++++++++++++++++++++++++++++++++++++++ xtendr/xtendrsystem.py | 109 ++++++++++++++++- 5 files changed, 367 insertions(+), 5 deletions(-) create mode 100644 xtendr/signing.py diff --git a/README.md b/README.md index fd31ebe..fe1cf0b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # XtendR +![Latest Version](https://gitpot.lerk.ing/badges/badge/static?label=Latest+version&message=0.5.0&color=blue) A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle. diff --git a/requirements.txt b/requirements.txt index 2d14540..4e4f65b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ setuptools==68.2.2 +cryptography>=42 diff --git a/setup.py b/setup.py index f02a5b7..9c5519a 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if __name__ == "__main__": setup( name="XtendR", - version="0.4.0", + version="0.5.0", packages=find_packages(), install_requires=[], author="Jan Lerking", diff --git a/xtendr/signing.py b/xtendr/signing.py new file mode 100644 index 0000000..16962e4 --- /dev/null +++ b/xtendr/signing.py @@ -0,0 +1,259 @@ +""" +xtendr_signing.py + +Core logic for signing XtendR plugins and maintaining a whitelist. +Deliberately kept free of any GTK/Adw imports so it can be unit tested +and reused by both the signer GUI and (later) XtendRSystem's verification +hook. + +Design: +- Ed25519 keypair: private key stays with the signer, public key ships + inside XtendR / PyPac to verify at attach() time. +- Each whitelist entry covers the plugin's manifest (.json) and its + module file (.py), hashed together with SHA-256. The signature + covers the whole entry (hash + name + module + class), so an attacker + can't splice a valid hash onto a different plugin identity. +- The whitelist file itself can optionally be encrypted at rest with a + passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is + for confidentiality only -- the signature is what provides integrity, + and still verifies correctly whether or not the file on disk is + encrypted. +""" +from __future__ import annotations + +import base64 +import hashlib +import json +import os +from dataclasses import dataclass, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) +from cryptography.hazmat.primitives import serialization +from cryptography.exceptions import InvalidSignature +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives import hashes + +PBKDF2_ITERATIONS = 600_000 + + +# -------------------------------------------------------------------------- +# Key management +# -------------------------------------------------------------------------- + +def generate_keypair(private_path: Path, public_path: Path) -> None: + """Generate a new Ed25519 keypair and write it to disk (unencrypted PEM). + + Caller is responsible for keeping private_path safe (e.g. 0600 perms, + offline machine, backups). This is deliberately not done automatically + since the right answer depends on the user's environment. + """ + private_path.parent.mkdir(parents=True, exist_ok=True) + key = Ed25519PrivateKey.generate() + + priv_bytes = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + pub_bytes = key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + private_path.write_bytes(priv_bytes) + try: + os.chmod(private_path, 0o600) + except OSError: + pass + public_path.write_bytes(pub_bytes) + + +def load_private_key(path: Path) -> Ed25519PrivateKey: + key = serialization.load_pem_private_key(path.read_bytes(), password=None) + if not isinstance(key, Ed25519PrivateKey): + raise ValueError(f"'{path}' is not an Ed25519 private key.") + return key + + +def load_public_key(path: Path) -> Ed25519PublicKey: + key = serialization.load_pem_public_key(path.read_bytes()) + if not isinstance(key, Ed25519PublicKey): + raise ValueError(f"'{path}' is not an Ed25519 public key.") + return key + + +# -------------------------------------------------------------------------- +# Plugin hashing + entry signing +# -------------------------------------------------------------------------- + +@dataclass +class PluginEntry: + name: str # plugin folder name + module: str # module name, from the manifest + cls: str # class name, from the manifest + sha256: str # hash of manifest + module file contents + signed_at: str # ISO-8601 UTC timestamp + signature: str = "" # base64 Ed25519 signature, filled in after signing + + def canonical_bytes(self) -> bytes: + """Bytes that get signed / verified -- everything except the + signature itself, in a stable, sorted-key JSON encoding.""" + payload = { + "name": self.name, + "module": self.module, + "cls": self.cls, + "sha256": self.sha256, + "signed_at": self.signed_at, + } + return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + + +def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str: + """Hash the manifest + module file together, in a fixed order, so the + hash changes if either file is tampered with.""" + manifest_path = plugin_dir / f"{name}.json" + module_path = plugin_dir / f"{module}.py" + if not manifest_path.is_file(): + raise FileNotFoundError(f"Manifest not found: {manifest_path}") + if not module_path.is_file(): + raise FileNotFoundError(f"Module file not found: {module_path}") + + h = hashlib.sha256() + for p in (manifest_path, module_path): + h.update(p.name.encode("utf-8")) + h.update(b"\x00") + h.update(p.read_bytes()) + h.update(b"\x00") + return h.hexdigest() + + +def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry: + manifest_path_glob = list(plugin_dir.glob("*.json")) + if len(manifest_path_glob) != 1: + raise ValueError( + f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}." + ) + manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8")) + module = manifest.get("module") + cls = manifest.get("class") + if not module or not cls: + raise ValueError("Manifest is missing 'module' or 'class'.") + + name = plugin_dir.name + digest = hash_plugin_files(plugin_dir, name, module) + entry = PluginEntry( + name=name, + module=module, + cls=cls, + sha256=digest, + signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"), + ) + signature = private_key.sign(entry.canonical_bytes()) + entry.signature = base64.b64encode(signature).decode("ascii") + return entry + + +def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool: + try: + public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes()) + return True + except (InvalidSignature, ValueError): + return False + + +def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool: + """Full verification: signature is valid AND the files on disk still + match the hash that was signed.""" + if not verify_entry(public_key, entry): + return False + try: + current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module) + except FileNotFoundError: + return False + return current_hash == entry.sha256 + + +# -------------------------------------------------------------------------- +# Whitelist storage (optionally encrypted at rest) +# -------------------------------------------------------------------------- + +class Whitelist: + def __init__(self): + self.entries: dict[str, PluginEntry] = {} + + def add(self, entry: PluginEntry) -> None: + self.entries[entry.name] = entry + + def remove(self, name: str) -> bool: + return self.entries.pop(name, None) is not None + + def to_json(self) -> str: + payload = {name: asdict(e) for name, e in self.entries.items()} + return json.dumps(payload, indent=2, sort_keys=True) + + @classmethod + def from_json(cls, data: str) -> "Whitelist": + wl = cls() + raw = json.loads(data) + for name, fields in raw.items(): + wl.entries[name] = PluginEntry(**fields) + return wl + + # -- disk I/O ----------------------------------------------------- + + def save(self, path: Path, passphrase: Optional[str] = None) -> None: + plaintext = self.to_json().encode("utf-8") + if passphrase is None: + path.write_bytes(plaintext) + return + + salt = os.urandom(16) + key = _derive_key(passphrase, salt) + token = Fernet(key).encrypt(plaintext) + container = { + "encrypted": True, + "kdf": "pbkdf2-sha256", + "iterations": PBKDF2_ITERATIONS, + "salt": base64.b64encode(salt).decode("ascii"), + "data": base64.b64encode(token).decode("ascii"), + } + path.write_text(json.dumps(container, indent=2), encoding="utf-8") + + @classmethod + def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist": + raw = path.read_text(encoding="utf-8") + try: + container = json.loads(raw) + except json.JSONDecodeError: + container = None + + if isinstance(container, dict) and container.get("encrypted"): + if passphrase is None: + raise ValueError("Whitelist is encrypted; a passphrase is required.") + salt = base64.b64decode(container["salt"]) + key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS)) + token = base64.b64decode(container["data"]) + try: + plaintext = Fernet(key).decrypt(token).decode("utf-8") + except InvalidToken as e: + raise ValueError("Wrong passphrase or corrupted whitelist file.") from e + return cls.from_json(plaintext) + + return cls.from_json(raw) + + +def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes: + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=iterations, + ) + return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8"))) diff --git a/xtendr/xtendrsystem.py b/xtendr/xtendrsystem.py index 99cdc20..62c19d5 100644 --- a/xtendr/xtendrsystem.py +++ b/xtendr/xtendrsystem.py @@ -6,9 +6,11 @@ import json import stat import logging import threading +from pathlib import Path from xtendr.xtendrbase import XtendRBase +from xtendr import signing as xsign -__version__ = "0.4.0" +__version__ = "0.5.0" logger = logging.getLogger("xtendr") @@ -18,6 +20,11 @@ logger = logging.getLogger("xtendr") _NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$") _MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") +# Signature status values stored in plugins[name]["signature"]["status"]. +SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out +SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all +SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered) + class XtendRSystem: """Plugin system to manage plugins. @@ -30,7 +37,7 @@ class XtendRSystem: Example: >>> system = XtendRSystem() >>> system.version() - XtendR v0.4.0 + XtendR v0.5.0 >>> system.attach("example_plugin", lambda: None) >>> system.run("example_plugin") ExamplePlugin is running! @@ -40,20 +47,81 @@ class XtendRSystem: Detached plugin 'example_plugin'. """ - def __init__(self, pluginpath="plugins"): + def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None): self.pluginspath = pluginpath self.plugins = {} self._lock = threading.RLock() + # -- signature verification setup --------------------------------- + # If either the public key or the whitelist can't be loaded, we + # fail closed: self._public_key / self._whitelist stay None, and + # every plugin will come back as SIG_UNSIGNED (disabled) rather + # than silently skipping verification. This is deliberate -- an + # admin who wants unsigned plugins to run should not be able to + # get there by accident (e.g. a missing/misspelled key path). + self._public_key = None + self._whitelist = None + + if public_key_path is not None: + try: + self._public_key = xsign.load_public_key(Path(public_key_path)) + except (OSError, ValueError) as e: + logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e) + + if whitelist_path is not None: + try: + self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase) + except (OSError, ValueError) as e: + logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e) + + if self._public_key is None or self._whitelist is None: + logger.warning( + "Signature verification is not fully configured for pluginpath '%s'; " + "all plugins will be treated as unsigned and permanently disabled.", + pluginpath, + ) + def version(self) -> str: return "XtendR v" + __version__ + @property + def verification_configured(self) -> bool: + """True if a public key and whitelist both loaded successfully.""" + return self._public_key is not None and self._whitelist is not None + def _validate_name(self, name: str) -> bool: if not isinstance(name, str) or not _NAME_RE.match(name): logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern) return False return True + def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict: + """Check a plugin's signature against the loaded whitelist. + + Returns a dict with at least a "status" key (SIG_VERIFIED / + SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is + available, for display in the UI. Never raises. + """ + result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None} + + if self._public_key is None or self._whitelist is None: + return result + + entry = self._whitelist.entries.get(name) + if entry is None: + return result + + result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at) + + try: + ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry) + except Exception: # noqa: BLE001 - never let a verification bug crash attach() + logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True) + ok = False + + result["status"] = SIG_VERIFIED if ok else SIG_INVALID + return result + def _check_permissions(self, path: str) -> None: """Warn (don't block) if a plugin file is group/world-writable.""" try: @@ -116,6 +184,33 @@ class XtendRSystem: self._check_permissions(module_file) + # Verify the plugin's signature BEFORE we ever execute its code. + # Unsigned/tampered plugins still get a listing entry (built + # from the manifest alone, which is inert JSON) but their .py + # file is never imported, and they can never be run. + sig = self._verify_signature(name, plugin_path, module_name) + if sig["status"] != SIG_VERIFIED: + if sig["status"] == SIG_INVALID: + logger.error( + "Plugin '%s' failed signature verification (tampered or bad " + "signature); attaching as permanently disabled.", name, + ) + else: + logger.warning( + "Plugin '%s' has no valid whitelist entry; attaching as " + "permanently disabled.", name, + ) + self.plugins[name] = { + "instance": None, + "running": False, + "info": plugin_info, + "autorun": False, + "module_key": None, + "disabled": True, + "signature": sig, + } + return + # Load the module directly from its file path instead of # mutating sys.path. This prevents a plugin from shadowing # stdlib or third-party modules for the rest of the process. @@ -147,6 +242,8 @@ class XtendRSystem: "info": plugin_info, "autorun": False, "module_key": qualified_name, + "disabled": False, + "signature": sig, } logger.info("Attached plugin '%s'.", name) logger.info("Running pre-load on '%s'.", name) @@ -167,6 +264,9 @@ class XtendRSystem: if entry is None: logger.error("Plugin '%s' not found or has no 'run' method.", name) return + if entry.get("disabled"): + logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name) + return entry["running"] = True try: return entry["instance"].run(*args, **kwargs) @@ -194,5 +294,6 @@ class XtendRSystem: if entry is None: logger.info("Plugin '%s' is not attached.", name) return - sys.modules.pop(entry["module_key"], None) + if entry.get("module_key"): + sys.modules.pop(entry["module_key"], None) logger.info("Detached plugin '%s'.", name) -- 2.39.5