Hardened XtendR. /JL #1

Merged
Lerking merged 1 commits from 0.4.0 into main 2026-07-03 08:44:35 +02:00
2 changed files with 165 additions and 66 deletions
+1 -1
View File
@@ -6,7 +6,7 @@ if __name__ == "__main__":
setup(
name="XtendR",
version="0.3.3.1",
version="0.4.0",
packages=find_packages(),
install_requires=[],
author="Jan Lerking",
+164 -65
View File
@@ -1,20 +1,37 @@
import importlib
import importlib.util
import sys
import os
import re
import json
import stat
import logging
import threading
from xtendr.xtendrbase import XtendRBase
__version__ = "0.3.3.1"
__version__ = "0.4.0"
logger = logging.getLogger("xtendr")
# Plugin (folder) names and module names are restricted to a safe identifier
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
# that have no business in a plugin name.
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
class XtendRSystem:
"""Plugin system to manage plugins.
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
privileges of the host process. Only attach plugins from sources you
trust. This class validates names/paths and isolates module loading, but
it cannot make untrusted plugin code safe to run.
Example:
>>> system = XtendRSystem()
>>> system.version()
XtendR v0.1.3
>>> system.attach("example_plugin") # Assuming 'example_plugin/example_plugin.json' exists
XtendR v0.4.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin")
ExamplePlugin is running!
>>> system.stop("example_plugin")
@@ -22,78 +39,160 @@ class XtendRSystem:
>>> system.detach("example_plugin")
Detached plugin 'example_plugin'.
"""
def __init__(self, pluginpath = "plugins"):
def __init__(self, pluginpath="plugins"):
self.pluginspath = pluginpath
self.plugins = {}
self._lock = threading.RLock()
def version(self) -> str:
return "XtendR v" + __version__
def attach(self, name: str, callback) -> None:
"""Dynamically load a plugin from its folder."""
if name in self.plugins:
print(f"Plugin '{name}' is already attached.")
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
print(plugin_path + "\n" + info_path)
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
print(f"Failed to attach plugin '{name}', folder or info file not found.")
return
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _check_permissions(self, path: str) -> None:
"""Warn (don't block) if a plugin file is group/world-writable."""
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
print(f"Plugin '{name}' info file is missing 'module' or 'class' key.")
return
sys.path.insert(0, plugin_path)
module = importlib.import_module(module_name)
st = os.stat(path)
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
logger.warning(
"Plugin file '%s' is group/world-writable; this is a "
"security risk on shared systems.",
path,
)
except OSError:
pass
def attach(self, name: str, callback=None) -> None:
"""Dynamically load a plugin from its folder."""
with self._lock:
if name in self.plugins:
logger.info("Plugin '%s' is already attached.", name)
return
if not self._validate_name(name):
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
# Defense in depth: even with a validated name, make sure the
# resolved path is actually inside the plugins directory.
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
return
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
return
self._check_permissions(info_path)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to read info file for plugin '%s': %s", name, e)
return
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
return
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
return
module_file = os.path.join(plugin_path, module_name + ".py")
if not os.path.isfile(module_file):
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
return
self._check_permissions(module_file)
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
qualified_name = f"xtendr_plugin_{name}_{module_name}"
try:
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create import spec for '{module_file}'")
module = importlib.util.module_from_spec(spec)
sys.modules[qualified_name] = module
spec.loader.exec_module(module)
plugin_class = getattr(module, class_name)
instance = plugin_class()
if not isinstance(instance, XtendRBase):
print(f"Plugin '{name}' does not inherit from PluginBase.")
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
'instance': instance,
'running': False,
'info': plugin_info,
'autorun': False
}
print(f"Attached plugin '{name}'.")
print(f"Running pre-load on '{name}'.")
thread = threading.Thread(target=self.plugins[name]['instance'].pre_load, args=(callback,))
thread.start()
except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e:
print(f"Failed to attach plugin '{name}': {e}")
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
"instance": instance,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
def _pre_load_worker():
try:
instance.pre_load(callback)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
thread = threading.Thread(target=_pre_load_worker, daemon=True)
thread.start()
def run(self, name: str, *args, **kwargs):
"""Run the plugin's 'run' method if available."""
if name in self.plugins:
self.plugins[name]['running'] = True
return self.plugins[name]['instance'].run(*args, **kwargs)
print(f"Plugin '{name}' not found or has no 'run' method.")
with self._lock:
entry = self.plugins.get(name)
if entry is None:
logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
entry["running"] = False
def stop(self, name: str) -> None:
"""Stop the plugin if it's running."""
if name in self.plugins and self.plugins[name]['running']:
self.plugins[name]['running'] = False
self.plugins[name]['instance'].stop()
else:
print(f"Plugin '{name}' is not running.")
with self._lock:
entry = self.plugins.get(name)
if entry is None or not entry["running"]:
logger.info("Plugin '%s' is not running.", name)
return
entry["running"] = False
try:
entry["instance"].stop()
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
def detach(self, name: str) -> None:
"""Unload a plugin."""
if name in self.plugins:
del self.plugins[name]
sys.modules.pop(name, None)
print(f"Detached plugin '{name}'.")
else:
print(f"Plugin '{name}' is not attached.")
with self._lock:
entry = self.plugins.pop(name, None)
if entry is None:
logger.info("Plugin '%s' is not attached.", name)
return
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)