From 984458d8f358c6bcb8dbcf40ddd84ea7a432a035 Mon Sep 17 00:00:00 2001 From: Jan Lerking Date: Fri, 3 Jul 2026 08:39:45 +0200 Subject: [PATCH] Hardened XtendR. /JL --- setup.py | 2 +- xtendr/xtendrsystem.py | 229 +++++++++++++++++++++++++++++------------ 2 files changed, 165 insertions(+), 66 deletions(-) diff --git a/setup.py b/setup.py index 35b566a..f02a5b7 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if __name__ == "__main__": setup( name="XtendR", - version="0.3.3.1", + version="0.4.0", packages=find_packages(), install_requires=[], author="Jan Lerking", diff --git a/xtendr/xtendrsystem.py b/xtendr/xtendrsystem.py index 5db9627..99cdc20 100644 --- a/xtendr/xtendrsystem.py +++ b/xtendr/xtendrsystem.py @@ -1,20 +1,37 @@ -import importlib +import importlib.util import sys import os +import re import json +import stat +import logging import threading from xtendr.xtendrbase import XtendRBase -__version__ = "0.3.3.1" +__version__ = "0.4.0" + +logger = logging.getLogger("xtendr") + +# Plugin (folder) names and module names are restricted to a safe identifier +# pattern. This blocks path traversal (e.g. "../../etc") and stray characters +# that have no business in a plugin name. +_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$") +_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + class XtendRSystem: """Plugin system to manage plugins. - + + SECURITY NOTE: plugins are arbitrary Python code that runs with the full + privileges of the host process. Only attach plugins from sources you + trust. This class validates names/paths and isolates module loading, but + it cannot make untrusted plugin code safe to run. + Example: >>> system = XtendRSystem() >>> system.version() - XtendR v0.1.3 - >>> system.attach("example_plugin") # Assuming 'example_plugin/example_plugin.json' exists + XtendR v0.4.0 + >>> system.attach("example_plugin", lambda: None) >>> system.run("example_plugin") ExamplePlugin is running! >>> system.stop("example_plugin") @@ -22,78 +39,160 @@ class XtendRSystem: >>> system.detach("example_plugin") Detached plugin 'example_plugin'. """ - def __init__(self, pluginpath = "plugins"): + + def __init__(self, pluginpath="plugins"): self.pluginspath = pluginpath self.plugins = {} - + self._lock = threading.RLock() + def version(self) -> str: return "XtendR v" + __version__ - - def attach(self, name: str, callback) -> None: - """Dynamically load a plugin from its folder.""" - if name in self.plugins: - print(f"Plugin '{name}' is already attached.") - return - - plugin_path = os.path.join(os.getcwd(), self.pluginspath, name) - info_path = os.path.join(plugin_path, name + ".json") - print(plugin_path + "\n" + info_path) - - if not os.path.isdir(plugin_path) or not os.path.isfile(info_path): - print(f"Failed to attach plugin '{name}', folder or info file not found.") - return - + + def _validate_name(self, name: str) -> bool: + if not isinstance(name, str) or not _NAME_RE.match(name): + logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern) + return False + return True + + def _check_permissions(self, path: str) -> None: + """Warn (don't block) if a plugin file is group/world-writable.""" try: - with open(info_path, "r", encoding="utf-8") as f: - plugin_info = json.load(f) - module_name = plugin_info.get("module") - class_name = plugin_info.get("class") - if not module_name or not class_name: - print(f"Plugin '{name}' info file is missing 'module' or 'class' key.") - return - - sys.path.insert(0, plugin_path) - module = importlib.import_module(module_name) + st = os.stat(path) + if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH): + logger.warning( + "Plugin file '%s' is group/world-writable; this is a " + "security risk on shared systems.", + path, + ) + except OSError: + pass + + def attach(self, name: str, callback=None) -> None: + """Dynamically load a plugin from its folder.""" + with self._lock: + if name in self.plugins: + logger.info("Plugin '%s' is already attached.", name) + return + + if not self._validate_name(name): + return + + plugin_path = os.path.join(os.getcwd(), self.pluginspath, name) + info_path = os.path.join(plugin_path, name + ".json") + + # Defense in depth: even with a validated name, make sure the + # resolved path is actually inside the plugins directory. + plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath)) + if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root: + logger.error("Refusing to attach '%s': resolves outside plugins directory.", name) + return + + if not os.path.isdir(plugin_path) or not os.path.isfile(info_path): + logger.error("Failed to attach plugin '%s': folder or info file not found.", name) + return + + self._check_permissions(info_path) + + try: + with open(info_path, "r", encoding="utf-8") as f: + plugin_info = json.load(f) + except (OSError, json.JSONDecodeError) as e: + logger.error("Failed to read info file for plugin '%s': %s", name, e) + return + + module_name = plugin_info.get("module") + class_name = plugin_info.get("class") + if not module_name or not class_name: + logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name) + return + if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name): + logger.error("Plugin '%s' has an invalid module/class identifier.", name) + return + + module_file = os.path.join(plugin_path, module_name + ".py") + if not os.path.isfile(module_file): + logger.error("Plugin '%s' module file '%s' not found.", name, module_file) + return + + self._check_permissions(module_file) + + # Load the module directly from its file path instead of + # mutating sys.path. This prevents a plugin from shadowing + # stdlib or third-party modules for the rest of the process. + qualified_name = f"xtendr_plugin_{name}_{module_name}" + try: + spec = importlib.util.spec_from_file_location(qualified_name, module_file) + if spec is None or spec.loader is None: + raise ImportError(f"Could not create import spec for '{module_file}'") + module = importlib.util.module_from_spec(spec) + sys.modules[qualified_name] = module + spec.loader.exec_module(module) + plugin_class = getattr(module, class_name) instance = plugin_class() - + if not isinstance(instance, XtendRBase): - print(f"Plugin '{name}' does not inherit from PluginBase.") + logger.error("Plugin '%s' does not inherit from XtendRBase.", name) + sys.modules.pop(qualified_name, None) return - - self.plugins[name] = { - 'instance': instance, - 'running': False, - 'info': plugin_info, - 'autorun': False - } - print(f"Attached plugin '{name}'.") - print(f"Running pre-load on '{name}'.") - thread = threading.Thread(target=self.plugins[name]['instance'].pre_load, args=(callback,)) - thread.start() - except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e: - print(f"Failed to attach plugin '{name}': {e}") - + + except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures + logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True) + sys.modules.pop(qualified_name, None) + return + + self.plugins[name] = { + "instance": instance, + "running": False, + "info": plugin_info, + "autorun": False, + "module_key": qualified_name, + } + logger.info("Attached plugin '%s'.", name) + logger.info("Running pre-load on '%s'.", name) + + def _pre_load_worker(): + try: + instance.pre_load(callback) + except Exception: # noqa: BLE001 + logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True) + + thread = threading.Thread(target=_pre_load_worker, daemon=True) + thread.start() + def run(self, name: str, *args, **kwargs): """Run the plugin's 'run' method if available.""" - if name in self.plugins: - self.plugins[name]['running'] = True - return self.plugins[name]['instance'].run(*args, **kwargs) - print(f"Plugin '{name}' not found or has no 'run' method.") - + with self._lock: + entry = self.plugins.get(name) + if entry is None: + logger.error("Plugin '%s' not found or has no 'run' method.", name) + return + entry["running"] = True + try: + return entry["instance"].run(*args, **kwargs) + except Exception: # noqa: BLE001 + logger.error("Plugin '%s' raised during run.", name, exc_info=True) + entry["running"] = False + def stop(self, name: str) -> None: """Stop the plugin if it's running.""" - if name in self.plugins and self.plugins[name]['running']: - self.plugins[name]['running'] = False - self.plugins[name]['instance'].stop() - else: - print(f"Plugin '{name}' is not running.") - + with self._lock: + entry = self.plugins.get(name) + if entry is None or not entry["running"]: + logger.info("Plugin '%s' is not running.", name) + return + entry["running"] = False + try: + entry["instance"].stop() + except Exception: # noqa: BLE001 + logger.error("Plugin '%s' raised during stop.", name, exc_info=True) + def detach(self, name: str) -> None: """Unload a plugin.""" - if name in self.plugins: - del self.plugins[name] - sys.modules.pop(name, None) - print(f"Detached plugin '{name}'.") - else: - print(f"Plugin '{name}' is not attached.") + with self._lock: + entry = self.plugins.pop(name, None) + if entry is None: + logger.info("Plugin '%s' is not attached.", name) + return + sys.modules.pop(entry["module_key"], None) + logger.info("Detached plugin '%s'.", name)