Gated signatures. /JL
This commit was merged in pull request #2.
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
# XtendR
|
||||

|
||||
|
||||
A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
|
||||
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
setuptools==68.2.2
|
||||
cryptography>=42
|
||||
|
||||
@@ -6,7 +6,7 @@ if __name__ == "__main__":
|
||||
|
||||
setup(
|
||||
name="XtendR",
|
||||
version="0.4.0",
|
||||
version="0.5.0",
|
||||
packages=find_packages(),
|
||||
install_requires=[],
|
||||
author="Jan Lerking",
|
||||
|
||||
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
xtendr_signing.py
|
||||
|
||||
Core logic for signing XtendR plugins and maintaining a whitelist.
|
||||
Deliberately kept free of any GTK/Adw imports so it can be unit tested
|
||||
and reused by both the signer GUI and (later) XtendRSystem's verification
|
||||
hook.
|
||||
|
||||
Design:
|
||||
- Ed25519 keypair: private key stays with the signer, public key ships
|
||||
inside XtendR / PyPac to verify at attach() time.
|
||||
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
|
||||
module file (<module>.py), hashed together with SHA-256. The signature
|
||||
covers the whole entry (hash + name + module + class), so an attacker
|
||||
can't splice a valid hash onto a different plugin identity.
|
||||
- The whitelist file itself can optionally be encrypted at rest with a
|
||||
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
|
||||
for confidentiality only -- the signature is what provides integrity,
|
||||
and still verifies correctly whether or not the file on disk is
|
||||
encrypted.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||||
Ed25519PrivateKey,
|
||||
Ed25519PublicKey,
|
||||
)
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
PBKDF2_ITERATIONS = 600_000
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Key management
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
def generate_keypair(private_path: Path, public_path: Path) -> None:
|
||||
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
|
||||
|
||||
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
|
||||
offline machine, backups). This is deliberately not done automatically
|
||||
since the right answer depends on the user's environment.
|
||||
"""
|
||||
private_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
key = Ed25519PrivateKey.generate()
|
||||
|
||||
priv_bytes = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
pub_bytes = key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
|
||||
private_path.write_bytes(priv_bytes)
|
||||
try:
|
||||
os.chmod(private_path, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
public_path.write_bytes(pub_bytes)
|
||||
|
||||
|
||||
def load_private_key(path: Path) -> Ed25519PrivateKey:
|
||||
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
|
||||
if not isinstance(key, Ed25519PrivateKey):
|
||||
raise ValueError(f"'{path}' is not an Ed25519 private key.")
|
||||
return key
|
||||
|
||||
|
||||
def load_public_key(path: Path) -> Ed25519PublicKey:
|
||||
key = serialization.load_pem_public_key(path.read_bytes())
|
||||
if not isinstance(key, Ed25519PublicKey):
|
||||
raise ValueError(f"'{path}' is not an Ed25519 public key.")
|
||||
return key
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Plugin hashing + entry signing
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class PluginEntry:
|
||||
name: str # plugin folder name
|
||||
module: str # module name, from the manifest
|
||||
cls: str # class name, from the manifest
|
||||
sha256: str # hash of manifest + module file contents
|
||||
signed_at: str # ISO-8601 UTC timestamp
|
||||
signature: str = "" # base64 Ed25519 signature, filled in after signing
|
||||
|
||||
def canonical_bytes(self) -> bytes:
|
||||
"""Bytes that get signed / verified -- everything except the
|
||||
signature itself, in a stable, sorted-key JSON encoding."""
|
||||
payload = {
|
||||
"name": self.name,
|
||||
"module": self.module,
|
||||
"cls": self.cls,
|
||||
"sha256": self.sha256,
|
||||
"signed_at": self.signed_at,
|
||||
}
|
||||
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
|
||||
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
|
||||
"""Hash the manifest + module file together, in a fixed order, so the
|
||||
hash changes if either file is tampered with."""
|
||||
manifest_path = plugin_dir / f"{name}.json"
|
||||
module_path = plugin_dir / f"{module}.py"
|
||||
if not manifest_path.is_file():
|
||||
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
|
||||
if not module_path.is_file():
|
||||
raise FileNotFoundError(f"Module file not found: {module_path}")
|
||||
|
||||
h = hashlib.sha256()
|
||||
for p in (manifest_path, module_path):
|
||||
h.update(p.name.encode("utf-8"))
|
||||
h.update(b"\x00")
|
||||
h.update(p.read_bytes())
|
||||
h.update(b"\x00")
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
|
||||
manifest_path_glob = list(plugin_dir.glob("*.json"))
|
||||
if len(manifest_path_glob) != 1:
|
||||
raise ValueError(
|
||||
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
|
||||
)
|
||||
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
|
||||
module = manifest.get("module")
|
||||
cls = manifest.get("class")
|
||||
if not module or not cls:
|
||||
raise ValueError("Manifest is missing 'module' or 'class'.")
|
||||
|
||||
name = plugin_dir.name
|
||||
digest = hash_plugin_files(plugin_dir, name, module)
|
||||
entry = PluginEntry(
|
||||
name=name,
|
||||
module=module,
|
||||
cls=cls,
|
||||
sha256=digest,
|
||||
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||
)
|
||||
signature = private_key.sign(entry.canonical_bytes())
|
||||
entry.signature = base64.b64encode(signature).decode("ascii")
|
||||
return entry
|
||||
|
||||
|
||||
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
|
||||
try:
|
||||
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
|
||||
return True
|
||||
except (InvalidSignature, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
|
||||
"""Full verification: signature is valid AND the files on disk still
|
||||
match the hash that was signed."""
|
||||
if not verify_entry(public_key, entry):
|
||||
return False
|
||||
try:
|
||||
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
return current_hash == entry.sha256
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Whitelist storage (optionally encrypted at rest)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
class Whitelist:
|
||||
def __init__(self):
|
||||
self.entries: dict[str, PluginEntry] = {}
|
||||
|
||||
def add(self, entry: PluginEntry) -> None:
|
||||
self.entries[entry.name] = entry
|
||||
|
||||
def remove(self, name: str) -> bool:
|
||||
return self.entries.pop(name, None) is not None
|
||||
|
||||
def to_json(self) -> str:
|
||||
payload = {name: asdict(e) for name, e in self.entries.items()}
|
||||
return json.dumps(payload, indent=2, sort_keys=True)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: str) -> "Whitelist":
|
||||
wl = cls()
|
||||
raw = json.loads(data)
|
||||
for name, fields in raw.items():
|
||||
wl.entries[name] = PluginEntry(**fields)
|
||||
return wl
|
||||
|
||||
# -- disk I/O -----------------------------------------------------
|
||||
|
||||
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
|
||||
plaintext = self.to_json().encode("utf-8")
|
||||
if passphrase is None:
|
||||
path.write_bytes(plaintext)
|
||||
return
|
||||
|
||||
salt = os.urandom(16)
|
||||
key = _derive_key(passphrase, salt)
|
||||
token = Fernet(key).encrypt(plaintext)
|
||||
container = {
|
||||
"encrypted": True,
|
||||
"kdf": "pbkdf2-sha256",
|
||||
"iterations": PBKDF2_ITERATIONS,
|
||||
"salt": base64.b64encode(salt).decode("ascii"),
|
||||
"data": base64.b64encode(token).decode("ascii"),
|
||||
}
|
||||
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
try:
|
||||
container = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
container = None
|
||||
|
||||
if isinstance(container, dict) and container.get("encrypted"):
|
||||
if passphrase is None:
|
||||
raise ValueError("Whitelist is encrypted; a passphrase is required.")
|
||||
salt = base64.b64decode(container["salt"])
|
||||
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
|
||||
token = base64.b64decode(container["data"])
|
||||
try:
|
||||
plaintext = Fernet(key).decrypt(token).decode("utf-8")
|
||||
except InvalidToken as e:
|
||||
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
|
||||
return cls.from_json(plaintext)
|
||||
|
||||
return cls.from_json(raw)
|
||||
|
||||
|
||||
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=iterations,
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
|
||||
+105
-4
@@ -6,9 +6,11 @@ import json
|
||||
import stat
|
||||
import logging
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from xtendr.xtendrbase import XtendRBase
|
||||
from xtendr import signing as xsign
|
||||
|
||||
__version__ = "0.4.0"
|
||||
__version__ = "0.5.0"
|
||||
|
||||
logger = logging.getLogger("xtendr")
|
||||
|
||||
@@ -18,6 +20,11 @@ logger = logging.getLogger("xtendr")
|
||||
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
|
||||
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
# Signature status values stored in plugins[name]["signature"]["status"].
|
||||
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
|
||||
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
|
||||
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
|
||||
|
||||
|
||||
class XtendRSystem:
|
||||
"""Plugin system to manage plugins.
|
||||
@@ -30,7 +37,7 @@ class XtendRSystem:
|
||||
Example:
|
||||
>>> system = XtendRSystem()
|
||||
>>> system.version()
|
||||
XtendR v0.4.0
|
||||
XtendR v0.5.0
|
||||
>>> system.attach("example_plugin", lambda: None)
|
||||
>>> system.run("example_plugin")
|
||||
ExamplePlugin is running!
|
||||
@@ -40,20 +47,81 @@ class XtendRSystem:
|
||||
Detached plugin 'example_plugin'.
|
||||
"""
|
||||
|
||||
def __init__(self, pluginpath="plugins"):
|
||||
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
|
||||
self.pluginspath = pluginpath
|
||||
self.plugins = {}
|
||||
self._lock = threading.RLock()
|
||||
|
||||
# -- signature verification setup ---------------------------------
|
||||
# If either the public key or the whitelist can't be loaded, we
|
||||
# fail closed: self._public_key / self._whitelist stay None, and
|
||||
# every plugin will come back as SIG_UNSIGNED (disabled) rather
|
||||
# than silently skipping verification. This is deliberate -- an
|
||||
# admin who wants unsigned plugins to run should not be able to
|
||||
# get there by accident (e.g. a missing/misspelled key path).
|
||||
self._public_key = None
|
||||
self._whitelist = None
|
||||
|
||||
if public_key_path is not None:
|
||||
try:
|
||||
self._public_key = xsign.load_public_key(Path(public_key_path))
|
||||
except (OSError, ValueError) as e:
|
||||
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
|
||||
|
||||
if whitelist_path is not None:
|
||||
try:
|
||||
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
|
||||
except (OSError, ValueError) as e:
|
||||
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
|
||||
|
||||
if self._public_key is None or self._whitelist is None:
|
||||
logger.warning(
|
||||
"Signature verification is not fully configured for pluginpath '%s'; "
|
||||
"all plugins will be treated as unsigned and permanently disabled.",
|
||||
pluginpath,
|
||||
)
|
||||
|
||||
def version(self) -> str:
|
||||
return "XtendR v" + __version__
|
||||
|
||||
@property
|
||||
def verification_configured(self) -> bool:
|
||||
"""True if a public key and whitelist both loaded successfully."""
|
||||
return self._public_key is not None and self._whitelist is not None
|
||||
|
||||
def _validate_name(self, name: str) -> bool:
|
||||
if not isinstance(name, str) or not _NAME_RE.match(name):
|
||||
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
|
||||
return False
|
||||
return True
|
||||
|
||||
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
|
||||
"""Check a plugin's signature against the loaded whitelist.
|
||||
|
||||
Returns a dict with at least a "status" key (SIG_VERIFIED /
|
||||
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
|
||||
available, for display in the UI. Never raises.
|
||||
"""
|
||||
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
|
||||
|
||||
if self._public_key is None or self._whitelist is None:
|
||||
return result
|
||||
|
||||
entry = self._whitelist.entries.get(name)
|
||||
if entry is None:
|
||||
return result
|
||||
|
||||
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
|
||||
|
||||
try:
|
||||
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
|
||||
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
|
||||
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
|
||||
ok = False
|
||||
|
||||
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
|
||||
return result
|
||||
|
||||
def _check_permissions(self, path: str) -> None:
|
||||
"""Warn (don't block) if a plugin file is group/world-writable."""
|
||||
try:
|
||||
@@ -116,6 +184,33 @@ class XtendRSystem:
|
||||
|
||||
self._check_permissions(module_file)
|
||||
|
||||
# Verify the plugin's signature BEFORE we ever execute its code.
|
||||
# Unsigned/tampered plugins still get a listing entry (built
|
||||
# from the manifest alone, which is inert JSON) but their .py
|
||||
# file is never imported, and they can never be run.
|
||||
sig = self._verify_signature(name, plugin_path, module_name)
|
||||
if sig["status"] != SIG_VERIFIED:
|
||||
if sig["status"] == SIG_INVALID:
|
||||
logger.error(
|
||||
"Plugin '%s' failed signature verification (tampered or bad "
|
||||
"signature); attaching as permanently disabled.", name,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Plugin '%s' has no valid whitelist entry; attaching as "
|
||||
"permanently disabled.", name,
|
||||
)
|
||||
self.plugins[name] = {
|
||||
"instance": None,
|
||||
"running": False,
|
||||
"info": plugin_info,
|
||||
"autorun": False,
|
||||
"module_key": None,
|
||||
"disabled": True,
|
||||
"signature": sig,
|
||||
}
|
||||
return
|
||||
|
||||
# Load the module directly from its file path instead of
|
||||
# mutating sys.path. This prevents a plugin from shadowing
|
||||
# stdlib or third-party modules for the rest of the process.
|
||||
@@ -147,6 +242,8 @@ class XtendRSystem:
|
||||
"info": plugin_info,
|
||||
"autorun": False,
|
||||
"module_key": qualified_name,
|
||||
"disabled": False,
|
||||
"signature": sig,
|
||||
}
|
||||
logger.info("Attached plugin '%s'.", name)
|
||||
logger.info("Running pre-load on '%s'.", name)
|
||||
@@ -167,6 +264,9 @@ class XtendRSystem:
|
||||
if entry is None:
|
||||
logger.error("Plugin '%s' not found or has no 'run' method.", name)
|
||||
return
|
||||
if entry.get("disabled"):
|
||||
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
|
||||
return
|
||||
entry["running"] = True
|
||||
try:
|
||||
return entry["instance"].run(*args, **kwargs)
|
||||
@@ -194,5 +294,6 @@ class XtendRSystem:
|
||||
if entry is None:
|
||||
logger.info("Plugin '%s' is not attached.", name)
|
||||
return
|
||||
sys.modules.pop(entry["module_key"], None)
|
||||
if entry.get("module_key"):
|
||||
sys.modules.pop(entry["module_key"], None)
|
||||
logger.info("Detached plugin '%s'.", name)
|
||||
|
||||
Reference in New Issue
Block a user