Files
XtendR/xtendr/xtendrsystem.py
T
2026-07-03 08:51:39 +02:00

300 lines
12 KiB
Python

import importlib.util
import sys
import os
import re
import json
import stat
import logging
import threading
from pathlib import Path
from xtendr.xtendrbase import XtendRBase
from xtendr import signing as xsign
__version__ = "0.5.0"
logger = logging.getLogger("xtendr")
# Plugin (folder) names and module names are restricted to a safe identifier
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
# that have no business in a plugin name.
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# Signature status values stored in plugins[name]["signature"]["status"].
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
class XtendRSystem:
"""Plugin system to manage plugins.
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
privileges of the host process. Only attach plugins from sources you
trust. This class validates names/paths and isolates module loading, but
it cannot make untrusted plugin code safe to run.
Example:
>>> system = XtendRSystem()
>>> system.version()
XtendR v0.5.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin")
ExamplePlugin is running!
>>> system.stop("example_plugin")
ExamplePlugin has stopped!
>>> system.detach("example_plugin")
Detached plugin 'example_plugin'.
"""
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
self.pluginspath = pluginpath
self.plugins = {}
self._lock = threading.RLock()
# -- signature verification setup ---------------------------------
# If either the public key or the whitelist can't be loaded, we
# fail closed: self._public_key / self._whitelist stay None, and
# every plugin will come back as SIG_UNSIGNED (disabled) rather
# than silently skipping verification. This is deliberate -- an
# admin who wants unsigned plugins to run should not be able to
# get there by accident (e.g. a missing/misspelled key path).
self._public_key = None
self._whitelist = None
if public_key_path is not None:
try:
self._public_key = xsign.load_public_key(Path(public_key_path))
except (OSError, ValueError) as e:
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
if whitelist_path is not None:
try:
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
except (OSError, ValueError) as e:
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
if self._public_key is None or self._whitelist is None:
logger.warning(
"Signature verification is not fully configured for pluginpath '%s'; "
"all plugins will be treated as unsigned and permanently disabled.",
pluginpath,
)
def version(self) -> str:
return "XtendR v" + __version__
@property
def verification_configured(self) -> bool:
"""True if a public key and whitelist both loaded successfully."""
return self._public_key is not None and self._whitelist is not None
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
"""Check a plugin's signature against the loaded whitelist.
Returns a dict with at least a "status" key (SIG_VERIFIED /
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
available, for display in the UI. Never raises.
"""
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
if self._public_key is None or self._whitelist is None:
return result
entry = self._whitelist.entries.get(name)
if entry is None:
return result
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
try:
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
ok = False
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
return result
def _check_permissions(self, path: str) -> None:
"""Warn (don't block) if a plugin file is group/world-writable."""
try:
st = os.stat(path)
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
logger.warning(
"Plugin file '%s' is group/world-writable; this is a "
"security risk on shared systems.",
path,
)
except OSError:
pass
def attach(self, name: str, callback=None) -> None:
"""Dynamically load a plugin from its folder."""
with self._lock:
if name in self.plugins:
logger.info("Plugin '%s' is already attached.", name)
return
if not self._validate_name(name):
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
# Defense in depth: even with a validated name, make sure the
# resolved path is actually inside the plugins directory.
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
return
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
return
self._check_permissions(info_path)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to read info file for plugin '%s': %s", name, e)
return
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
return
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
return
module_file = os.path.join(plugin_path, module_name + ".py")
if not os.path.isfile(module_file):
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
return
self._check_permissions(module_file)
# Verify the plugin's signature BEFORE we ever execute its code.
# Unsigned/tampered plugins still get a listing entry (built
# from the manifest alone, which is inert JSON) but their .py
# file is never imported, and they can never be run.
sig = self._verify_signature(name, plugin_path, module_name)
if sig["status"] != SIG_VERIFIED:
if sig["status"] == SIG_INVALID:
logger.error(
"Plugin '%s' failed signature verification (tampered or bad "
"signature); attaching as permanently disabled.", name,
)
else:
logger.warning(
"Plugin '%s' has no valid whitelist entry; attaching as "
"permanently disabled.", name,
)
self.plugins[name] = {
"instance": None,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": None,
"disabled": True,
"signature": sig,
}
return
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
qualified_name = f"xtendr_plugin_{name}_{module_name}"
try:
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create import spec for '{module_file}'")
module = importlib.util.module_from_spec(spec)
sys.modules[qualified_name] = module
spec.loader.exec_module(module)
plugin_class = getattr(module, class_name)
instance = plugin_class()
if not isinstance(instance, XtendRBase):
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
sys.modules.pop(qualified_name, None)
return
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
"instance": instance,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
"disabled": False,
"signature": sig,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
def _pre_load_worker():
try:
instance.pre_load(callback)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
thread = threading.Thread(target=_pre_load_worker, daemon=True)
thread.start()
def run(self, name: str, *args, **kwargs):
"""Run the plugin's 'run' method if available."""
with self._lock:
entry = self.plugins.get(name)
if entry is None:
logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
if entry.get("disabled"):
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
entry["running"] = False
def stop(self, name: str) -> None:
"""Stop the plugin if it's running."""
with self._lock:
entry = self.plugins.get(name)
if entry is None or not entry["running"]:
logger.info("Plugin '%s' is not running.", name)
return
entry["running"] = False
try:
entry["instance"].stop()
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
def detach(self, name: str) -> None:
"""Unload a plugin."""
with self._lock:
entry = self.plugins.pop(name, None)
if entry is None:
logger.info("Plugin '%s' is not attached.", name)
return
if entry.get("module_key"):
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)