2 Commits

Author SHA1 Message Date
Lerking ad67becd91 Verification configured. /JL 2026-07-03 09:00:13 +02:00
Lerking a4fe3a2b80 Gated signatures. /JL 2026-07-03 08:51:39 +02:00
5 changed files with 367 additions and 5 deletions
+1
View File
@@ -1,4 +1,5 @@
# XtendR
![Latest Version](https://gitpot.lerk.ing/badges/badge/static?label=Latest+version&message=0.5.1&color=blue)
A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
+1
View File
@@ -1 +1,2 @@
setuptools==68.2.2
cryptography>=42
+1 -1
View File
@@ -6,7 +6,7 @@ if __name__ == "__main__":
setup(
name="XtendR",
version="0.4.0",
version="0.5.1",
packages=find_packages(),
install_requires=[],
author="Jan Lerking",
+259
View File
@@ -0,0 +1,259 @@
"""
xtendr_signing.py
Core logic for signing XtendR plugins and maintaining a whitelist.
Deliberately kept free of any GTK/Adw imports so it can be unit tested
and reused by both the signer GUI and (later) XtendRSystem's verification
hook.
Design:
- Ed25519 keypair: private key stays with the signer, public key ships
inside XtendR / PyPac to verify at attach() time.
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
module file (<module>.py), hashed together with SHA-256. The signature
covers the whole entry (hash + name + module + class), so an attacker
can't splice a valid hash onto a different plugin identity.
- The whitelist file itself can optionally be encrypted at rest with a
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
for confidentiality only -- the signature is what provides integrity,
and still verifies correctly whether or not the file on disk is
encrypted.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
PBKDF2_ITERATIONS = 600_000
# --------------------------------------------------------------------------
# Key management
# --------------------------------------------------------------------------
def generate_keypair(private_path: Path, public_path: Path) -> None:
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
offline machine, backups). This is deliberately not done automatically
since the right answer depends on the user's environment.
"""
private_path.parent.mkdir(parents=True, exist_ok=True)
key = Ed25519PrivateKey.generate()
priv_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pub_bytes = key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
private_path.write_bytes(priv_bytes)
try:
os.chmod(private_path, 0o600)
except OSError:
pass
public_path.write_bytes(pub_bytes)
def load_private_key(path: Path) -> Ed25519PrivateKey:
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
if not isinstance(key, Ed25519PrivateKey):
raise ValueError(f"'{path}' is not an Ed25519 private key.")
return key
def load_public_key(path: Path) -> Ed25519PublicKey:
key = serialization.load_pem_public_key(path.read_bytes())
if not isinstance(key, Ed25519PublicKey):
raise ValueError(f"'{path}' is not an Ed25519 public key.")
return key
# --------------------------------------------------------------------------
# Plugin hashing + entry signing
# --------------------------------------------------------------------------
@dataclass
class PluginEntry:
name: str # plugin folder name
module: str # module name, from the manifest
cls: str # class name, from the manifest
sha256: str # hash of manifest + module file contents
signed_at: str # ISO-8601 UTC timestamp
signature: str = "" # base64 Ed25519 signature, filled in after signing
def canonical_bytes(self) -> bytes:
"""Bytes that get signed / verified -- everything except the
signature itself, in a stable, sorted-key JSON encoding."""
payload = {
"name": self.name,
"module": self.module,
"cls": self.cls,
"sha256": self.sha256,
"signed_at": self.signed_at,
}
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
"""Hash the manifest + module file together, in a fixed order, so the
hash changes if either file is tampered with."""
manifest_path = plugin_dir / f"{name}.json"
module_path = plugin_dir / f"{module}.py"
if not manifest_path.is_file():
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
if not module_path.is_file():
raise FileNotFoundError(f"Module file not found: {module_path}")
h = hashlib.sha256()
for p in (manifest_path, module_path):
h.update(p.name.encode("utf-8"))
h.update(b"\x00")
h.update(p.read_bytes())
h.update(b"\x00")
return h.hexdigest()
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
manifest_path_glob = list(plugin_dir.glob("*.json"))
if len(manifest_path_glob) != 1:
raise ValueError(
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
)
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
module = manifest.get("module")
cls = manifest.get("class")
if not module or not cls:
raise ValueError("Manifest is missing 'module' or 'class'.")
name = plugin_dir.name
digest = hash_plugin_files(plugin_dir, name, module)
entry = PluginEntry(
name=name,
module=module,
cls=cls,
sha256=digest,
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
signature = private_key.sign(entry.canonical_bytes())
entry.signature = base64.b64encode(signature).decode("ascii")
return entry
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
try:
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
return True
except (InvalidSignature, ValueError):
return False
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
"""Full verification: signature is valid AND the files on disk still
match the hash that was signed."""
if not verify_entry(public_key, entry):
return False
try:
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
except FileNotFoundError:
return False
return current_hash == entry.sha256
# --------------------------------------------------------------------------
# Whitelist storage (optionally encrypted at rest)
# --------------------------------------------------------------------------
class Whitelist:
def __init__(self):
self.entries: dict[str, PluginEntry] = {}
def add(self, entry: PluginEntry) -> None:
self.entries[entry.name] = entry
def remove(self, name: str) -> bool:
return self.entries.pop(name, None) is not None
def to_json(self) -> str:
payload = {name: asdict(e) for name, e in self.entries.items()}
return json.dumps(payload, indent=2, sort_keys=True)
@classmethod
def from_json(cls, data: str) -> "Whitelist":
wl = cls()
raw = json.loads(data)
for name, fields in raw.items():
wl.entries[name] = PluginEntry(**fields)
return wl
# -- disk I/O -----------------------------------------------------
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
plaintext = self.to_json().encode("utf-8")
if passphrase is None:
path.write_bytes(plaintext)
return
salt = os.urandom(16)
key = _derive_key(passphrase, salt)
token = Fernet(key).encrypt(plaintext)
container = {
"encrypted": True,
"kdf": "pbkdf2-sha256",
"iterations": PBKDF2_ITERATIONS,
"salt": base64.b64encode(salt).decode("ascii"),
"data": base64.b64encode(token).decode("ascii"),
}
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
@classmethod
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
raw = path.read_text(encoding="utf-8")
try:
container = json.loads(raw)
except json.JSONDecodeError:
container = None
if isinstance(container, dict) and container.get("encrypted"):
if passphrase is None:
raise ValueError("Whitelist is encrypted; a passphrase is required.")
salt = base64.b64decode(container["salt"])
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
token = base64.b64decode(container["data"])
try:
plaintext = Fernet(key).decrypt(token).decode("utf-8")
except InvalidToken as e:
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
return cls.from_json(plaintext)
return cls.from_json(raw)
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=iterations,
)
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
+105 -4
View File
@@ -6,9 +6,11 @@ import json
import stat
import logging
import threading
from pathlib import Path
from xtendr.xtendrbase import XtendRBase
from xtendr import signing as xsign
__version__ = "0.4.0"
__version__ = "0.5.0"
logger = logging.getLogger("xtendr")
@@ -18,6 +20,11 @@ logger = logging.getLogger("xtendr")
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# Signature status values stored in plugins[name]["signature"]["status"].
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
class XtendRSystem:
"""Plugin system to manage plugins.
@@ -30,7 +37,7 @@ class XtendRSystem:
Example:
>>> system = XtendRSystem()
>>> system.version()
XtendR v0.4.0
XtendR v0.5.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin")
ExamplePlugin is running!
@@ -40,20 +47,81 @@ class XtendRSystem:
Detached plugin 'example_plugin'.
"""
def __init__(self, pluginpath="plugins"):
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
self.pluginspath = pluginpath
self.plugins = {}
self._lock = threading.RLock()
# -- signature verification setup ---------------------------------
# If either the public key or the whitelist can't be loaded, we
# fail closed: self._public_key / self._whitelist stay None, and
# every plugin will come back as SIG_UNSIGNED (disabled) rather
# than silently skipping verification. This is deliberate -- an
# admin who wants unsigned plugins to run should not be able to
# get there by accident (e.g. a missing/misspelled key path).
self._public_key = None
self._whitelist = None
if public_key_path is not None:
try:
self._public_key = xsign.load_public_key(Path(public_key_path))
except (OSError, ValueError) as e:
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
if whitelist_path is not None:
try:
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
except (OSError, ValueError) as e:
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
if self._public_key is None or self._whitelist is None:
logger.warning(
"Signature verification is not fully configured for pluginpath '%s'; "
"all plugins will be treated as unsigned and permanently disabled.",
pluginpath,
)
def version(self) -> str:
return "XtendR v" + __version__
@property
def verification_configured(self) -> bool:
"""True if a public key and whitelist both loaded successfully."""
return self._public_key is not None and self._whitelist is not None
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
"""Check a plugin's signature against the loaded whitelist.
Returns a dict with at least a "status" key (SIG_VERIFIED /
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
available, for display in the UI. Never raises.
"""
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
if self._public_key is None or self._whitelist is None:
return result
entry = self._whitelist.entries.get(name)
if entry is None:
return result
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
try:
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
ok = False
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
return result
def _check_permissions(self, path: str) -> None:
"""Warn (don't block) if a plugin file is group/world-writable."""
try:
@@ -116,6 +184,33 @@ class XtendRSystem:
self._check_permissions(module_file)
# Verify the plugin's signature BEFORE we ever execute its code.
# Unsigned/tampered plugins still get a listing entry (built
# from the manifest alone, which is inert JSON) but their .py
# file is never imported, and they can never be run.
sig = self._verify_signature(name, plugin_path, module_name)
if sig["status"] != SIG_VERIFIED:
if sig["status"] == SIG_INVALID:
logger.error(
"Plugin '%s' failed signature verification (tampered or bad "
"signature); attaching as permanently disabled.", name,
)
else:
logger.warning(
"Plugin '%s' has no valid whitelist entry; attaching as "
"permanently disabled.", name,
)
self.plugins[name] = {
"instance": None,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": None,
"disabled": True,
"signature": sig,
}
return
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
@@ -147,6 +242,8 @@ class XtendRSystem:
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
"disabled": False,
"signature": sig,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
@@ -167,6 +264,9 @@ class XtendRSystem:
if entry is None:
logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
if entry.get("disabled"):
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
@@ -194,5 +294,6 @@ class XtendRSystem:
if entry is None:
logger.info("Plugin '%s' is not attached.", name)
return
sys.modules.pop(entry["module_key"], None)
if entry.get("module_key"):
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)