Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ad67becd91 | |||
| a4fe3a2b80 | |||
| 984458d8f3 | |||
| be1af3ad05 | |||
| 0c8108d4d8 | |||
| bcc9b864d1 | |||
| a4c552d6f5 | |||
| 34c7927601 | |||
| ebf31756b5 | |||
| 406c1b77ed | |||
| 1be429c5cd | |||
| a093b3f56c | |||
| 1dc0c81d84 | |||
| 3e38cabf81 | |||
| 63f649af25 | |||
| b6efe1b1df | |||
| c5ee073d65 | |||
| 3bcf01dc26 | |||
| dd5904c273 | |||
| b763308aa9 | |||
| 34ba757bc6 | |||
| b5d2e1b0be | |||
| d3031952bf | |||
| 3312f2da28 | |||
| a21122671d | |||
| 9effa4be3e | |||
| 217038b863 | |||
| 8c19c4a97b | |||
| c7662a5d47 |
@@ -1,6 +1,7 @@
|
||||
# XtendR
|
||||

|
||||
|
||||
A very basic Python 3.12 plugin system based on the K.I.S.S principle.
|
||||
A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
|
||||
|
||||
I was in need of a new plugin system, which should meet these requirements:
|
||||
:heavy_plus_sign: Simple to use
|
||||
@@ -16,7 +17,8 @@ I didn't find anything that suited my needs, so I decided to make my own plugin
|
||||
It simply contains 2 classes, one for the plugin system and one abstraction base class for the plugins themselves.
|
||||
|
||||
At the moment only 4 functions are available:
|
||||
- Attach
|
||||
|
||||
- Attach - including call to pre-load data in plugin.
|
||||
- Run
|
||||
- Stop
|
||||
- Detach
|
||||
@@ -27,4 +29,4 @@ The Run and Stop functions are mandatory in the plugin modules.
|
||||
The system expects a folder called 'plugins', placed at the root, along side your main python file.
|
||||
Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder.
|
||||
|
||||
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
|
||||
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
|
||||
|
||||
+13
-2
@@ -1,5 +1,9 @@
|
||||
import time
|
||||
from xtendr.xtendrsystem import XtendRSystem
|
||||
|
||||
def my_callback():
|
||||
print("'example_plugin is finished pre-loading")
|
||||
|
||||
if __name__ == "__main__":
|
||||
"""Example usage of the PluginSystem.
|
||||
|
||||
@@ -15,7 +19,14 @@ if __name__ == "__main__":
|
||||
Detached plugin 'example_plugin'.
|
||||
"""
|
||||
system = XtendRSystem()
|
||||
system.attach("example_plugin") # Assuming 'example_plugin/plugin_info.json' exists
|
||||
system.run("example_plugin")
|
||||
system.attach("example_plugin", my_callback) # Assuming 'example_plugin/plugin_info.json' exists
|
||||
for i in range(3):
|
||||
print(f"Main program is running iteration {i+1}...")
|
||||
time.sleep(2)
|
||||
system.run("example_plugin", test="Hello!")
|
||||
system.stop("example_plugin")
|
||||
system.run("example_plugin", 25)
|
||||
system.stop("example_plugin")
|
||||
system.run("example_plugin", "Hello!", 25)
|
||||
system.stop("example_plugin")
|
||||
system.detach("example_plugin")
|
||||
@@ -1,3 +1,5 @@
|
||||
import threading
|
||||
import time
|
||||
from xtendr.xtendrbase import XtendRBase
|
||||
|
||||
class ExamplePlugin(XtendRBase):
|
||||
@@ -5,13 +7,33 @@ class ExamplePlugin(XtendRBase):
|
||||
|
||||
Example:
|
||||
>>> plugin = ExamplePlugin()
|
||||
>>> plugin.run()
|
||||
>>> plugin.run("Hello!", 25)
|
||||
Passed arguments 2:
|
||||
Argument 0: Hello!
|
||||
Argument 1: 25
|
||||
ExamplePlugin is running!
|
||||
>>> plugin.stop()
|
||||
ExamplePlugin has stopped!
|
||||
"""
|
||||
def run(self):
|
||||
def run(self, *args, **kwargs):
|
||||
arglen = len(args)
|
||||
keylen = len(kwargs)
|
||||
if arglen > 0:
|
||||
print(f"Passed arguments {arglen}:")
|
||||
for idx, a in enumerate(args):
|
||||
print(f"Argument {idx}: {a}")
|
||||
if keylen > 0:
|
||||
print(f"Keyword arguments passed {keylen}")
|
||||
if not "test" in kwargs:
|
||||
raise ValueError("Didn't get expected 'test' keyword!")
|
||||
for kw in kwargs:
|
||||
print(f"Argument {kw}: {kwargs[kw]}")
|
||||
|
||||
print("ExamplePlugin is running!")
|
||||
|
||||
def stop(self):
|
||||
print("ExamplePlugin has stopped!")
|
||||
|
||||
def pre_load(self, callback):
|
||||
time.sleep(5) # Indicate long running pre-load.
|
||||
callback()
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
setuptools==68.2.2
|
||||
cryptography>=42
|
||||
@@ -1,9 +1,12 @@
|
||||
if __name__ == "__main__":
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
from pathlib import Path
|
||||
this_directory = Path(__file__).parent
|
||||
long_description = (this_directory / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="XtendR",
|
||||
version="0.0.9",
|
||||
version="0.5.1",
|
||||
packages=find_packages(),
|
||||
install_requires=[],
|
||||
author="Jan Lerking",
|
||||
@@ -16,4 +19,6 @@ if __name__ == "__main__":
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires='>=3.11',
|
||||
long_description=long_description,
|
||||
long_description_content_type='text/markdown'
|
||||
)
|
||||
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
xtendr_signing.py
|
||||
|
||||
Core logic for signing XtendR plugins and maintaining a whitelist.
|
||||
Deliberately kept free of any GTK/Adw imports so it can be unit tested
|
||||
and reused by both the signer GUI and (later) XtendRSystem's verification
|
||||
hook.
|
||||
|
||||
Design:
|
||||
- Ed25519 keypair: private key stays with the signer, public key ships
|
||||
inside XtendR / PyPac to verify at attach() time.
|
||||
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
|
||||
module file (<module>.py), hashed together with SHA-256. The signature
|
||||
covers the whole entry (hash + name + module + class), so an attacker
|
||||
can't splice a valid hash onto a different plugin identity.
|
||||
- The whitelist file itself can optionally be encrypted at rest with a
|
||||
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
|
||||
for confidentiality only -- the signature is what provides integrity,
|
||||
and still verifies correctly whether or not the file on disk is
|
||||
encrypted.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||||
Ed25519PrivateKey,
|
||||
Ed25519PublicKey,
|
||||
)
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
PBKDF2_ITERATIONS = 600_000
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Key management
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
def generate_keypair(private_path: Path, public_path: Path) -> None:
|
||||
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
|
||||
|
||||
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
|
||||
offline machine, backups). This is deliberately not done automatically
|
||||
since the right answer depends on the user's environment.
|
||||
"""
|
||||
private_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
key = Ed25519PrivateKey.generate()
|
||||
|
||||
priv_bytes = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
pub_bytes = key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
|
||||
private_path.write_bytes(priv_bytes)
|
||||
try:
|
||||
os.chmod(private_path, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
public_path.write_bytes(pub_bytes)
|
||||
|
||||
|
||||
def load_private_key(path: Path) -> Ed25519PrivateKey:
|
||||
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
|
||||
if not isinstance(key, Ed25519PrivateKey):
|
||||
raise ValueError(f"'{path}' is not an Ed25519 private key.")
|
||||
return key
|
||||
|
||||
|
||||
def load_public_key(path: Path) -> Ed25519PublicKey:
|
||||
key = serialization.load_pem_public_key(path.read_bytes())
|
||||
if not isinstance(key, Ed25519PublicKey):
|
||||
raise ValueError(f"'{path}' is not an Ed25519 public key.")
|
||||
return key
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Plugin hashing + entry signing
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class PluginEntry:
|
||||
name: str # plugin folder name
|
||||
module: str # module name, from the manifest
|
||||
cls: str # class name, from the manifest
|
||||
sha256: str # hash of manifest + module file contents
|
||||
signed_at: str # ISO-8601 UTC timestamp
|
||||
signature: str = "" # base64 Ed25519 signature, filled in after signing
|
||||
|
||||
def canonical_bytes(self) -> bytes:
|
||||
"""Bytes that get signed / verified -- everything except the
|
||||
signature itself, in a stable, sorted-key JSON encoding."""
|
||||
payload = {
|
||||
"name": self.name,
|
||||
"module": self.module,
|
||||
"cls": self.cls,
|
||||
"sha256": self.sha256,
|
||||
"signed_at": self.signed_at,
|
||||
}
|
||||
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
|
||||
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
|
||||
"""Hash the manifest + module file together, in a fixed order, so the
|
||||
hash changes if either file is tampered with."""
|
||||
manifest_path = plugin_dir / f"{name}.json"
|
||||
module_path = plugin_dir / f"{module}.py"
|
||||
if not manifest_path.is_file():
|
||||
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
|
||||
if not module_path.is_file():
|
||||
raise FileNotFoundError(f"Module file not found: {module_path}")
|
||||
|
||||
h = hashlib.sha256()
|
||||
for p in (manifest_path, module_path):
|
||||
h.update(p.name.encode("utf-8"))
|
||||
h.update(b"\x00")
|
||||
h.update(p.read_bytes())
|
||||
h.update(b"\x00")
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
|
||||
manifest_path_glob = list(plugin_dir.glob("*.json"))
|
||||
if len(manifest_path_glob) != 1:
|
||||
raise ValueError(
|
||||
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
|
||||
)
|
||||
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
|
||||
module = manifest.get("module")
|
||||
cls = manifest.get("class")
|
||||
if not module or not cls:
|
||||
raise ValueError("Manifest is missing 'module' or 'class'.")
|
||||
|
||||
name = plugin_dir.name
|
||||
digest = hash_plugin_files(plugin_dir, name, module)
|
||||
entry = PluginEntry(
|
||||
name=name,
|
||||
module=module,
|
||||
cls=cls,
|
||||
sha256=digest,
|
||||
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||
)
|
||||
signature = private_key.sign(entry.canonical_bytes())
|
||||
entry.signature = base64.b64encode(signature).decode("ascii")
|
||||
return entry
|
||||
|
||||
|
||||
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
|
||||
try:
|
||||
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
|
||||
return True
|
||||
except (InvalidSignature, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
|
||||
"""Full verification: signature is valid AND the files on disk still
|
||||
match the hash that was signed."""
|
||||
if not verify_entry(public_key, entry):
|
||||
return False
|
||||
try:
|
||||
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
return current_hash == entry.sha256
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Whitelist storage (optionally encrypted at rest)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
class Whitelist:
|
||||
def __init__(self):
|
||||
self.entries: dict[str, PluginEntry] = {}
|
||||
|
||||
def add(self, entry: PluginEntry) -> None:
|
||||
self.entries[entry.name] = entry
|
||||
|
||||
def remove(self, name: str) -> bool:
|
||||
return self.entries.pop(name, None) is not None
|
||||
|
||||
def to_json(self) -> str:
|
||||
payload = {name: asdict(e) for name, e in self.entries.items()}
|
||||
return json.dumps(payload, indent=2, sort_keys=True)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: str) -> "Whitelist":
|
||||
wl = cls()
|
||||
raw = json.loads(data)
|
||||
for name, fields in raw.items():
|
||||
wl.entries[name] = PluginEntry(**fields)
|
||||
return wl
|
||||
|
||||
# -- disk I/O -----------------------------------------------------
|
||||
|
||||
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
|
||||
plaintext = self.to_json().encode("utf-8")
|
||||
if passphrase is None:
|
||||
path.write_bytes(plaintext)
|
||||
return
|
||||
|
||||
salt = os.urandom(16)
|
||||
key = _derive_key(passphrase, salt)
|
||||
token = Fernet(key).encrypt(plaintext)
|
||||
container = {
|
||||
"encrypted": True,
|
||||
"kdf": "pbkdf2-sha256",
|
||||
"iterations": PBKDF2_ITERATIONS,
|
||||
"salt": base64.b64encode(salt).decode("ascii"),
|
||||
"data": base64.b64encode(token).decode("ascii"),
|
||||
}
|
||||
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
try:
|
||||
container = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
container = None
|
||||
|
||||
if isinstance(container, dict) and container.get("encrypted"):
|
||||
if passphrase is None:
|
||||
raise ValueError("Whitelist is encrypted; a passphrase is required.")
|
||||
salt = base64.b64decode(container["salt"])
|
||||
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
|
||||
token = base64.b64decode(container["data"])
|
||||
try:
|
||||
plaintext = Fernet(key).decrypt(token).decode("utf-8")
|
||||
except InvalidToken as e:
|
||||
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
|
||||
return cls.from_json(plaintext)
|
||||
|
||||
return cls.from_json(raw)
|
||||
|
||||
|
||||
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=iterations,
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
|
||||
@@ -17,9 +17,14 @@ class XtendRBase(ABC):
|
||||
Stopping TestPlugin
|
||||
"""
|
||||
@abstractmethod
|
||||
def run(self):
|
||||
def run(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def pre_load(self, *args):
|
||||
pass
|
||||
|
||||
+271
-58
@@ -1,15 +1,44 @@
|
||||
import importlib
|
||||
import importlib.util
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import stat
|
||||
import logging
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from xtendr.xtendrbase import XtendRBase
|
||||
from xtendr import signing as xsign
|
||||
|
||||
__version__ = "0.5.0"
|
||||
|
||||
logger = logging.getLogger("xtendr")
|
||||
|
||||
# Plugin (folder) names and module names are restricted to a safe identifier
|
||||
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
|
||||
# that have no business in a plugin name.
|
||||
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
|
||||
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||
|
||||
# Signature status values stored in plugins[name]["signature"]["status"].
|
||||
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
|
||||
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
|
||||
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
|
||||
|
||||
|
||||
class XtendRSystem:
|
||||
"""Plugin system to manage plugins.
|
||||
|
||||
|
||||
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
|
||||
privileges of the host process. Only attach plugins from sources you
|
||||
trust. This class validates names/paths and isolates module loading, but
|
||||
it cannot make untrusted plugin code safe to run.
|
||||
|
||||
Example:
|
||||
>>> system = XtendRSystem()
|
||||
>>> system.attach("example_plugin") # Assuming 'example_plugin/example_plugin.json' exists
|
||||
>>> system.version()
|
||||
XtendR v0.5.0
|
||||
>>> system.attach("example_plugin", lambda: None)
|
||||
>>> system.run("example_plugin")
|
||||
ExamplePlugin is running!
|
||||
>>> system.stop("example_plugin")
|
||||
@@ -17,70 +46,254 @@ class XtendRSystem:
|
||||
>>> system.detach("example_plugin")
|
||||
Detached plugin 'example_plugin'.
|
||||
"""
|
||||
def __init__(self):
|
||||
|
||||
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
|
||||
self.pluginspath = pluginpath
|
||||
self.plugins = {}
|
||||
|
||||
def attach(self, name: str) -> None:
|
||||
"""Dynamically load a plugin from its folder."""
|
||||
if name in self.plugins:
|
||||
print(f"Plugin '{name}' is already attached.")
|
||||
return
|
||||
|
||||
plugin_path = os.path.join(os.getcwd(), "plugins", name)
|
||||
info_path = os.path.join(plugin_path, name + ".json")
|
||||
print(plugin_path + "\n" + info_path)
|
||||
|
||||
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
|
||||
print(f"Failed to attach plugin '{name}', folder or info file not found.")
|
||||
return
|
||||
|
||||
self._lock = threading.RLock()
|
||||
|
||||
# -- signature verification setup ---------------------------------
|
||||
# If either the public key or the whitelist can't be loaded, we
|
||||
# fail closed: self._public_key / self._whitelist stay None, and
|
||||
# every plugin will come back as SIG_UNSIGNED (disabled) rather
|
||||
# than silently skipping verification. This is deliberate -- an
|
||||
# admin who wants unsigned plugins to run should not be able to
|
||||
# get there by accident (e.g. a missing/misspelled key path).
|
||||
self._public_key = None
|
||||
self._whitelist = None
|
||||
|
||||
if public_key_path is not None:
|
||||
try:
|
||||
self._public_key = xsign.load_public_key(Path(public_key_path))
|
||||
except (OSError, ValueError) as e:
|
||||
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
|
||||
|
||||
if whitelist_path is not None:
|
||||
try:
|
||||
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
|
||||
except (OSError, ValueError) as e:
|
||||
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
|
||||
|
||||
if self._public_key is None or self._whitelist is None:
|
||||
logger.warning(
|
||||
"Signature verification is not fully configured for pluginpath '%s'; "
|
||||
"all plugins will be treated as unsigned and permanently disabled.",
|
||||
pluginpath,
|
||||
)
|
||||
|
||||
def version(self) -> str:
|
||||
return "XtendR v" + __version__
|
||||
|
||||
@property
|
||||
def verification_configured(self) -> bool:
|
||||
"""True if a public key and whitelist both loaded successfully."""
|
||||
return self._public_key is not None and self._whitelist is not None
|
||||
|
||||
def _validate_name(self, name: str) -> bool:
|
||||
if not isinstance(name, str) or not _NAME_RE.match(name):
|
||||
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
|
||||
return False
|
||||
return True
|
||||
|
||||
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
|
||||
"""Check a plugin's signature against the loaded whitelist.
|
||||
|
||||
Returns a dict with at least a "status" key (SIG_VERIFIED /
|
||||
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
|
||||
available, for display in the UI. Never raises.
|
||||
"""
|
||||
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
|
||||
|
||||
if self._public_key is None or self._whitelist is None:
|
||||
return result
|
||||
|
||||
entry = self._whitelist.entries.get(name)
|
||||
if entry is None:
|
||||
return result
|
||||
|
||||
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
|
||||
|
||||
try:
|
||||
with open(info_path, "r", encoding="utf-8") as f:
|
||||
plugin_info = json.load(f)
|
||||
module_name = plugin_info.get("module")
|
||||
class_name = plugin_info.get("class")
|
||||
if not module_name or not class_name:
|
||||
print(f"Plugin '{name}' info file is missing 'module' or 'class' key.")
|
||||
return
|
||||
|
||||
sys.path.insert(0, plugin_path)
|
||||
module = importlib.import_module(module_name)
|
||||
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
|
||||
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
|
||||
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
|
||||
ok = False
|
||||
|
||||
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
|
||||
return result
|
||||
|
||||
def _check_permissions(self, path: str) -> None:
|
||||
"""Warn (don't block) if a plugin file is group/world-writable."""
|
||||
try:
|
||||
st = os.stat(path)
|
||||
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
|
||||
logger.warning(
|
||||
"Plugin file '%s' is group/world-writable; this is a "
|
||||
"security risk on shared systems.",
|
||||
path,
|
||||
)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def attach(self, name: str, callback=None) -> None:
|
||||
"""Dynamically load a plugin from its folder."""
|
||||
with self._lock:
|
||||
if name in self.plugins:
|
||||
logger.info("Plugin '%s' is already attached.", name)
|
||||
return
|
||||
|
||||
if not self._validate_name(name):
|
||||
return
|
||||
|
||||
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
|
||||
info_path = os.path.join(plugin_path, name + ".json")
|
||||
|
||||
# Defense in depth: even with a validated name, make sure the
|
||||
# resolved path is actually inside the plugins directory.
|
||||
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
|
||||
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
|
||||
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
|
||||
return
|
||||
|
||||
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
|
||||
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
|
||||
return
|
||||
|
||||
self._check_permissions(info_path)
|
||||
|
||||
try:
|
||||
with open(info_path, "r", encoding="utf-8") as f:
|
||||
plugin_info = json.load(f)
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
logger.error("Failed to read info file for plugin '%s': %s", name, e)
|
||||
return
|
||||
|
||||
module_name = plugin_info.get("module")
|
||||
class_name = plugin_info.get("class")
|
||||
if not module_name or not class_name:
|
||||
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
|
||||
return
|
||||
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
|
||||
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
|
||||
return
|
||||
|
||||
module_file = os.path.join(plugin_path, module_name + ".py")
|
||||
if not os.path.isfile(module_file):
|
||||
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
|
||||
return
|
||||
|
||||
self._check_permissions(module_file)
|
||||
|
||||
# Verify the plugin's signature BEFORE we ever execute its code.
|
||||
# Unsigned/tampered plugins still get a listing entry (built
|
||||
# from the manifest alone, which is inert JSON) but their .py
|
||||
# file is never imported, and they can never be run.
|
||||
sig = self._verify_signature(name, plugin_path, module_name)
|
||||
if sig["status"] != SIG_VERIFIED:
|
||||
if sig["status"] == SIG_INVALID:
|
||||
logger.error(
|
||||
"Plugin '%s' failed signature verification (tampered or bad "
|
||||
"signature); attaching as permanently disabled.", name,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Plugin '%s' has no valid whitelist entry; attaching as "
|
||||
"permanently disabled.", name,
|
||||
)
|
||||
self.plugins[name] = {
|
||||
"instance": None,
|
||||
"running": False,
|
||||
"info": plugin_info,
|
||||
"autorun": False,
|
||||
"module_key": None,
|
||||
"disabled": True,
|
||||
"signature": sig,
|
||||
}
|
||||
return
|
||||
|
||||
# Load the module directly from its file path instead of
|
||||
# mutating sys.path. This prevents a plugin from shadowing
|
||||
# stdlib or third-party modules for the rest of the process.
|
||||
qualified_name = f"xtendr_plugin_{name}_{module_name}"
|
||||
try:
|
||||
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
|
||||
if spec is None or spec.loader is None:
|
||||
raise ImportError(f"Could not create import spec for '{module_file}'")
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[qualified_name] = module
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
plugin_class = getattr(module, class_name)
|
||||
instance = plugin_class()
|
||||
|
||||
|
||||
if not isinstance(instance, XtendRBase):
|
||||
print(f"Plugin '{name}' does not inherit from PluginBase.")
|
||||
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
|
||||
sys.modules.pop(qualified_name, None)
|
||||
return
|
||||
|
||||
self.plugins[name] = {
|
||||
'instance': instance,
|
||||
'running': False,
|
||||
'info': plugin_info
|
||||
}
|
||||
print(f"Attached plugin '{name}'.")
|
||||
except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e:
|
||||
print(f"Failed to attach plugin '{name}': {e}")
|
||||
|
||||
|
||||
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
|
||||
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
|
||||
sys.modules.pop(qualified_name, None)
|
||||
return
|
||||
|
||||
self.plugins[name] = {
|
||||
"instance": instance,
|
||||
"running": False,
|
||||
"info": plugin_info,
|
||||
"autorun": False,
|
||||
"module_key": qualified_name,
|
||||
"disabled": False,
|
||||
"signature": sig,
|
||||
}
|
||||
logger.info("Attached plugin '%s'.", name)
|
||||
logger.info("Running pre-load on '%s'.", name)
|
||||
|
||||
def _pre_load_worker():
|
||||
try:
|
||||
instance.pre_load(callback)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
|
||||
|
||||
thread = threading.Thread(target=_pre_load_worker, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def run(self, name: str, *args, **kwargs):
|
||||
"""Run the plugin's 'run' method if available."""
|
||||
if name in self.plugins:
|
||||
self.plugins[name]['running'] = True
|
||||
return self.plugins[name]['instance'].run(*args, **kwargs)
|
||||
print(f"Plugin '{name}' not found or has no 'run' method.")
|
||||
|
||||
with self._lock:
|
||||
entry = self.plugins.get(name)
|
||||
if entry is None:
|
||||
logger.error("Plugin '%s' not found or has no 'run' method.", name)
|
||||
return
|
||||
if entry.get("disabled"):
|
||||
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
|
||||
return
|
||||
entry["running"] = True
|
||||
try:
|
||||
return entry["instance"].run(*args, **kwargs)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
|
||||
entry["running"] = False
|
||||
|
||||
def stop(self, name: str) -> None:
|
||||
"""Stop the plugin if it's running."""
|
||||
if name in self.plugins and self.plugins[name]['running']:
|
||||
self.plugins[name]['running'] = False
|
||||
self.plugins[name]['instance'].stop()
|
||||
else:
|
||||
print(f"Plugin '{name}' is not running.")
|
||||
|
||||
with self._lock:
|
||||
entry = self.plugins.get(name)
|
||||
if entry is None or not entry["running"]:
|
||||
logger.info("Plugin '%s' is not running.", name)
|
||||
return
|
||||
entry["running"] = False
|
||||
try:
|
||||
entry["instance"].stop()
|
||||
except Exception: # noqa: BLE001
|
||||
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
|
||||
|
||||
def detach(self, name: str) -> None:
|
||||
"""Unload a plugin."""
|
||||
if name in self.plugins:
|
||||
del self.plugins[name]
|
||||
sys.modules.pop(name, None)
|
||||
print(f"Detached plugin '{name}'.")
|
||||
else:
|
||||
print(f"Plugin '{name}' is not attached.")
|
||||
with self._lock:
|
||||
entry = self.plugins.pop(name, None)
|
||||
if entry is None:
|
||||
logger.info("Plugin '%s' is not attached.", name)
|
||||
return
|
||||
if entry.get("module_key"):
|
||||
sys.modules.pop(entry["module_key"], None)
|
||||
logger.info("Detached plugin '%s'.", name)
|
||||
|
||||
Reference in New Issue
Block a user