Files
XtendR/xtendr/xtendrsystem.py
T
2026-07-03 08:39:45 +02:00

199 lines
7.7 KiB
Python

import importlib.util
import sys
import os
import re
import json
import stat
import logging
import threading
from xtendr.xtendrbase import XtendRBase
__version__ = "0.4.0"
logger = logging.getLogger("xtendr")
# Plugin (folder) names and module names are restricted to a safe identifier
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
# that have no business in a plugin name.
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
class XtendRSystem:
"""Plugin system to manage plugins.
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
privileges of the host process. Only attach plugins from sources you
trust. This class validates names/paths and isolates module loading, but
it cannot make untrusted plugin code safe to run.
Example:
>>> system = XtendRSystem()
>>> system.version()
XtendR v0.4.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin")
ExamplePlugin is running!
>>> system.stop("example_plugin")
ExamplePlugin has stopped!
>>> system.detach("example_plugin")
Detached plugin 'example_plugin'.
"""
def __init__(self, pluginpath="plugins"):
self.pluginspath = pluginpath
self.plugins = {}
self._lock = threading.RLock()
def version(self) -> str:
return "XtendR v" + __version__
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _check_permissions(self, path: str) -> None:
"""Warn (don't block) if a plugin file is group/world-writable."""
try:
st = os.stat(path)
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
logger.warning(
"Plugin file '%s' is group/world-writable; this is a "
"security risk on shared systems.",
path,
)
except OSError:
pass
def attach(self, name: str, callback=None) -> None:
"""Dynamically load a plugin from its folder."""
with self._lock:
if name in self.plugins:
logger.info("Plugin '%s' is already attached.", name)
return
if not self._validate_name(name):
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
# Defense in depth: even with a validated name, make sure the
# resolved path is actually inside the plugins directory.
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
return
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
return
self._check_permissions(info_path)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to read info file for plugin '%s': %s", name, e)
return
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
return
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
return
module_file = os.path.join(plugin_path, module_name + ".py")
if not os.path.isfile(module_file):
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
return
self._check_permissions(module_file)
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
qualified_name = f"xtendr_plugin_{name}_{module_name}"
try:
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create import spec for '{module_file}'")
module = importlib.util.module_from_spec(spec)
sys.modules[qualified_name] = module
spec.loader.exec_module(module)
plugin_class = getattr(module, class_name)
instance = plugin_class()
if not isinstance(instance, XtendRBase):
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
sys.modules.pop(qualified_name, None)
return
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
"instance": instance,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
def _pre_load_worker():
try:
instance.pre_load(callback)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
thread = threading.Thread(target=_pre_load_worker, daemon=True)
thread.start()
def run(self, name: str, *args, **kwargs):
"""Run the plugin's 'run' method if available."""
with self._lock:
entry = self.plugins.get(name)
if entry is None:
logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
entry["running"] = False
def stop(self, name: str) -> None:
"""Stop the plugin if it's running."""
with self._lock:
entry = self.plugins.get(name)
if entry is None or not entry["running"]:
logger.info("Plugin '%s' is not running.", name)
return
entry["running"] = False
try:
entry["instance"].stop()
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
def detach(self, name: str) -> None:
"""Unload a plugin."""
with self._lock:
entry = self.plugins.pop(name, None)
if entry is None:
logger.info("Plugin '%s' is not attached.", name)
return
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)