Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ad67becd91 | |||
| a4fe3a2b80 | |||
| 984458d8f3 | |||
| be1af3ad05 | |||
| 0c8108d4d8 | |||
| bcc9b864d1 | |||
| a4c552d6f5 | |||
| 34c7927601 | |||
| ebf31756b5 | |||
| 406c1b77ed | |||
| 1be429c5cd | |||
| a093b3f56c | |||
| 1dc0c81d84 | |||
| 3e38cabf81 | |||
| 63f649af25 | |||
| b6efe1b1df | |||
| c5ee073d65 | |||
| 3bcf01dc26 | |||
| dd5904c273 | |||
| b763308aa9 | |||
| 34ba757bc6 | |||
| b5d2e1b0be | |||
| d3031952bf | |||
| 3312f2da28 | |||
| a21122671d | |||
| 9effa4be3e | |||
| 217038b863 |
@@ -1,6 +1,7 @@
|
|||||||
# XtendR
|
# XtendR
|
||||||
|

|
||||||
|
|
||||||
A very basic Python 3.12 plugin system based on the K.I.S.S principle.
|
A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
|
||||||
|
|
||||||
I was in need of a new plugin system, which should meet these requirements:
|
I was in need of a new plugin system, which should meet these requirements:
|
||||||
:heavy_plus_sign: Simple to use
|
:heavy_plus_sign: Simple to use
|
||||||
@@ -16,7 +17,8 @@ I didn't find anything that suited my needs, so I decided to make my own plugin
|
|||||||
It simply contains 2 classes, one for the plugin system and one abstraction base class for the plugins themselves.
|
It simply contains 2 classes, one for the plugin system and one abstraction base class for the plugins themselves.
|
||||||
|
|
||||||
At the moment only 4 functions are available:
|
At the moment only 4 functions are available:
|
||||||
- Attach
|
|
||||||
|
- Attach - including call to pre-load data in plugin.
|
||||||
- Run
|
- Run
|
||||||
- Stop
|
- Stop
|
||||||
- Detach
|
- Detach
|
||||||
@@ -27,4 +29,4 @@ The Run and Stop functions are mandatory in the plugin modules.
|
|||||||
The system expects a folder called 'plugins', placed at the root, along side your main python file.
|
The system expects a folder called 'plugins', placed at the root, along side your main python file.
|
||||||
Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder.
|
Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder.
|
||||||
|
|
||||||
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
|
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
|
||||||
|
|||||||
+13
-2
@@ -1,5 +1,9 @@
|
|||||||
|
import time
|
||||||
from xtendr.xtendrsystem import XtendRSystem
|
from xtendr.xtendrsystem import XtendRSystem
|
||||||
|
|
||||||
|
def my_callback():
|
||||||
|
print("'example_plugin is finished pre-loading")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
"""Example usage of the PluginSystem.
|
"""Example usage of the PluginSystem.
|
||||||
|
|
||||||
@@ -15,7 +19,14 @@ if __name__ == "__main__":
|
|||||||
Detached plugin 'example_plugin'.
|
Detached plugin 'example_plugin'.
|
||||||
"""
|
"""
|
||||||
system = XtendRSystem()
|
system = XtendRSystem()
|
||||||
system.attach("example_plugin") # Assuming 'example_plugin/plugin_info.json' exists
|
system.attach("example_plugin", my_callback) # Assuming 'example_plugin/plugin_info.json' exists
|
||||||
system.run("example_plugin")
|
for i in range(3):
|
||||||
|
print(f"Main program is running iteration {i+1}...")
|
||||||
|
time.sleep(2)
|
||||||
|
system.run("example_plugin", test="Hello!")
|
||||||
|
system.stop("example_plugin")
|
||||||
|
system.run("example_plugin", 25)
|
||||||
|
system.stop("example_plugin")
|
||||||
|
system.run("example_plugin", "Hello!", 25)
|
||||||
system.stop("example_plugin")
|
system.stop("example_plugin")
|
||||||
system.detach("example_plugin")
|
system.detach("example_plugin")
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import threading
|
||||||
|
import time
|
||||||
from xtendr.xtendrbase import XtendRBase
|
from xtendr.xtendrbase import XtendRBase
|
||||||
|
|
||||||
class ExamplePlugin(XtendRBase):
|
class ExamplePlugin(XtendRBase):
|
||||||
@@ -5,13 +7,33 @@ class ExamplePlugin(XtendRBase):
|
|||||||
|
|
||||||
Example:
|
Example:
|
||||||
>>> plugin = ExamplePlugin()
|
>>> plugin = ExamplePlugin()
|
||||||
>>> plugin.run()
|
>>> plugin.run("Hello!", 25)
|
||||||
|
Passed arguments 2:
|
||||||
|
Argument 0: Hello!
|
||||||
|
Argument 1: 25
|
||||||
ExamplePlugin is running!
|
ExamplePlugin is running!
|
||||||
>>> plugin.stop()
|
>>> plugin.stop()
|
||||||
ExamplePlugin has stopped!
|
ExamplePlugin has stopped!
|
||||||
"""
|
"""
|
||||||
def run(self):
|
def run(self, *args, **kwargs):
|
||||||
|
arglen = len(args)
|
||||||
|
keylen = len(kwargs)
|
||||||
|
if arglen > 0:
|
||||||
|
print(f"Passed arguments {arglen}:")
|
||||||
|
for idx, a in enumerate(args):
|
||||||
|
print(f"Argument {idx}: {a}")
|
||||||
|
if keylen > 0:
|
||||||
|
print(f"Keyword arguments passed {keylen}")
|
||||||
|
if not "test" in kwargs:
|
||||||
|
raise ValueError("Didn't get expected 'test' keyword!")
|
||||||
|
for kw in kwargs:
|
||||||
|
print(f"Argument {kw}: {kwargs[kw]}")
|
||||||
|
|
||||||
print("ExamplePlugin is running!")
|
print("ExamplePlugin is running!")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
print("ExamplePlugin has stopped!")
|
print("ExamplePlugin has stopped!")
|
||||||
|
|
||||||
|
def pre_load(self, callback):
|
||||||
|
time.sleep(5) # Indicate long running pre-load.
|
||||||
|
callback()
|
||||||
|
|||||||
@@ -0,0 +1,2 @@
|
|||||||
|
setuptools==68.2.2
|
||||||
|
cryptography>=42
|
||||||
@@ -1,9 +1,12 @@
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
from setuptools import setup, find_packages
|
from setuptools import setup, find_packages
|
||||||
|
from pathlib import Path
|
||||||
|
this_directory = Path(__file__).parent
|
||||||
|
long_description = (this_directory / "README.md").read_text()
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="XtendR",
|
name="XtendR",
|
||||||
version="0.0.9",
|
version="0.5.1",
|
||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
install_requires=[],
|
install_requires=[],
|
||||||
author="Jan Lerking",
|
author="Jan Lerking",
|
||||||
@@ -16,4 +19,6 @@ if __name__ == "__main__":
|
|||||||
"Operating System :: OS Independent",
|
"Operating System :: OS Independent",
|
||||||
],
|
],
|
||||||
python_requires='>=3.11',
|
python_requires='>=3.11',
|
||||||
|
long_description=long_description,
|
||||||
|
long_description_content_type='text/markdown'
|
||||||
)
|
)
|
||||||
@@ -0,0 +1,259 @@
|
|||||||
|
"""
|
||||||
|
xtendr_signing.py
|
||||||
|
|
||||||
|
Core logic for signing XtendR plugins and maintaining a whitelist.
|
||||||
|
Deliberately kept free of any GTK/Adw imports so it can be unit tested
|
||||||
|
and reused by both the signer GUI and (later) XtendRSystem's verification
|
||||||
|
hook.
|
||||||
|
|
||||||
|
Design:
|
||||||
|
- Ed25519 keypair: private key stays with the signer, public key ships
|
||||||
|
inside XtendR / PyPac to verify at attach() time.
|
||||||
|
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
|
||||||
|
module file (<module>.py), hashed together with SHA-256. The signature
|
||||||
|
covers the whole entry (hash + name + module + class), so an attacker
|
||||||
|
can't splice a valid hash onto a different plugin identity.
|
||||||
|
- The whitelist file itself can optionally be encrypted at rest with a
|
||||||
|
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
|
||||||
|
for confidentiality only -- the signature is what provides integrity,
|
||||||
|
and still verifies correctly whether or not the file on disk is
|
||||||
|
encrypted.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||||||
|
Ed25519PrivateKey,
|
||||||
|
Ed25519PublicKey,
|
||||||
|
)
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
|
||||||
|
PBKDF2_ITERATIONS = 600_000
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# Key management
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def generate_keypair(private_path: Path, public_path: Path) -> None:
|
||||||
|
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
|
||||||
|
|
||||||
|
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
|
||||||
|
offline machine, backups). This is deliberately not done automatically
|
||||||
|
since the right answer depends on the user's environment.
|
||||||
|
"""
|
||||||
|
private_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
key = Ed25519PrivateKey.generate()
|
||||||
|
|
||||||
|
priv_bytes = key.private_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PrivateFormat.PKCS8,
|
||||||
|
encryption_algorithm=serialization.NoEncryption(),
|
||||||
|
)
|
||||||
|
pub_bytes = key.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.PEM,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
)
|
||||||
|
|
||||||
|
private_path.write_bytes(priv_bytes)
|
||||||
|
try:
|
||||||
|
os.chmod(private_path, 0o600)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
public_path.write_bytes(pub_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def load_private_key(path: Path) -> Ed25519PrivateKey:
|
||||||
|
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
|
||||||
|
if not isinstance(key, Ed25519PrivateKey):
|
||||||
|
raise ValueError(f"'{path}' is not an Ed25519 private key.")
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
def load_public_key(path: Path) -> Ed25519PublicKey:
|
||||||
|
key = serialization.load_pem_public_key(path.read_bytes())
|
||||||
|
if not isinstance(key, Ed25519PublicKey):
|
||||||
|
raise ValueError(f"'{path}' is not an Ed25519 public key.")
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# Plugin hashing + entry signing
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PluginEntry:
|
||||||
|
name: str # plugin folder name
|
||||||
|
module: str # module name, from the manifest
|
||||||
|
cls: str # class name, from the manifest
|
||||||
|
sha256: str # hash of manifest + module file contents
|
||||||
|
signed_at: str # ISO-8601 UTC timestamp
|
||||||
|
signature: str = "" # base64 Ed25519 signature, filled in after signing
|
||||||
|
|
||||||
|
def canonical_bytes(self) -> bytes:
|
||||||
|
"""Bytes that get signed / verified -- everything except the
|
||||||
|
signature itself, in a stable, sorted-key JSON encoding."""
|
||||||
|
payload = {
|
||||||
|
"name": self.name,
|
||||||
|
"module": self.module,
|
||||||
|
"cls": self.cls,
|
||||||
|
"sha256": self.sha256,
|
||||||
|
"signed_at": self.signed_at,
|
||||||
|
}
|
||||||
|
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
|
||||||
|
"""Hash the manifest + module file together, in a fixed order, so the
|
||||||
|
hash changes if either file is tampered with."""
|
||||||
|
manifest_path = plugin_dir / f"{name}.json"
|
||||||
|
module_path = plugin_dir / f"{module}.py"
|
||||||
|
if not manifest_path.is_file():
|
||||||
|
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
|
||||||
|
if not module_path.is_file():
|
||||||
|
raise FileNotFoundError(f"Module file not found: {module_path}")
|
||||||
|
|
||||||
|
h = hashlib.sha256()
|
||||||
|
for p in (manifest_path, module_path):
|
||||||
|
h.update(p.name.encode("utf-8"))
|
||||||
|
h.update(b"\x00")
|
||||||
|
h.update(p.read_bytes())
|
||||||
|
h.update(b"\x00")
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
|
||||||
|
manifest_path_glob = list(plugin_dir.glob("*.json"))
|
||||||
|
if len(manifest_path_glob) != 1:
|
||||||
|
raise ValueError(
|
||||||
|
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
|
||||||
|
)
|
||||||
|
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
|
||||||
|
module = manifest.get("module")
|
||||||
|
cls = manifest.get("class")
|
||||||
|
if not module or not cls:
|
||||||
|
raise ValueError("Manifest is missing 'module' or 'class'.")
|
||||||
|
|
||||||
|
name = plugin_dir.name
|
||||||
|
digest = hash_plugin_files(plugin_dir, name, module)
|
||||||
|
entry = PluginEntry(
|
||||||
|
name=name,
|
||||||
|
module=module,
|
||||||
|
cls=cls,
|
||||||
|
sha256=digest,
|
||||||
|
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||||
|
)
|
||||||
|
signature = private_key.sign(entry.canonical_bytes())
|
||||||
|
entry.signature = base64.b64encode(signature).decode("ascii")
|
||||||
|
return entry
|
||||||
|
|
||||||
|
|
||||||
|
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
|
||||||
|
try:
|
||||||
|
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
|
||||||
|
return True
|
||||||
|
except (InvalidSignature, ValueError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
|
||||||
|
"""Full verification: signature is valid AND the files on disk still
|
||||||
|
match the hash that was signed."""
|
||||||
|
if not verify_entry(public_key, entry):
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
|
||||||
|
except FileNotFoundError:
|
||||||
|
return False
|
||||||
|
return current_hash == entry.sha256
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# Whitelist storage (optionally encrypted at rest)
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class Whitelist:
|
||||||
|
def __init__(self):
|
||||||
|
self.entries: dict[str, PluginEntry] = {}
|
||||||
|
|
||||||
|
def add(self, entry: PluginEntry) -> None:
|
||||||
|
self.entries[entry.name] = entry
|
||||||
|
|
||||||
|
def remove(self, name: str) -> bool:
|
||||||
|
return self.entries.pop(name, None) is not None
|
||||||
|
|
||||||
|
def to_json(self) -> str:
|
||||||
|
payload = {name: asdict(e) for name, e in self.entries.items()}
|
||||||
|
return json.dumps(payload, indent=2, sort_keys=True)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json(cls, data: str) -> "Whitelist":
|
||||||
|
wl = cls()
|
||||||
|
raw = json.loads(data)
|
||||||
|
for name, fields in raw.items():
|
||||||
|
wl.entries[name] = PluginEntry(**fields)
|
||||||
|
return wl
|
||||||
|
|
||||||
|
# -- disk I/O -----------------------------------------------------
|
||||||
|
|
||||||
|
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
|
||||||
|
plaintext = self.to_json().encode("utf-8")
|
||||||
|
if passphrase is None:
|
||||||
|
path.write_bytes(plaintext)
|
||||||
|
return
|
||||||
|
|
||||||
|
salt = os.urandom(16)
|
||||||
|
key = _derive_key(passphrase, salt)
|
||||||
|
token = Fernet(key).encrypt(plaintext)
|
||||||
|
container = {
|
||||||
|
"encrypted": True,
|
||||||
|
"kdf": "pbkdf2-sha256",
|
||||||
|
"iterations": PBKDF2_ITERATIONS,
|
||||||
|
"salt": base64.b64encode(salt).decode("ascii"),
|
||||||
|
"data": base64.b64encode(token).decode("ascii"),
|
||||||
|
}
|
||||||
|
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
|
||||||
|
raw = path.read_text(encoding="utf-8")
|
||||||
|
try:
|
||||||
|
container = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
container = None
|
||||||
|
|
||||||
|
if isinstance(container, dict) and container.get("encrypted"):
|
||||||
|
if passphrase is None:
|
||||||
|
raise ValueError("Whitelist is encrypted; a passphrase is required.")
|
||||||
|
salt = base64.b64decode(container["salt"])
|
||||||
|
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
|
||||||
|
token = base64.b64decode(container["data"])
|
||||||
|
try:
|
||||||
|
plaintext = Fernet(key).decrypt(token).decode("utf-8")
|
||||||
|
except InvalidToken as e:
|
||||||
|
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
|
||||||
|
return cls.from_json(plaintext)
|
||||||
|
|
||||||
|
return cls.from_json(raw)
|
||||||
|
|
||||||
|
|
||||||
|
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=salt,
|
||||||
|
iterations=iterations,
|
||||||
|
)
|
||||||
|
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
|
||||||
@@ -17,9 +17,14 @@ class XtendRBase(ABC):
|
|||||||
Stopping TestPlugin
|
Stopping TestPlugin
|
||||||
"""
|
"""
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def run(self):
|
def run(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def stop(self):
|
def stop(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def pre_load(self, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
+267
-62
@@ -1,19 +1,44 @@
|
|||||||
import importlib
|
import importlib.util
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import json
|
import json
|
||||||
|
import stat
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
from xtendr.xtendrbase import XtendRBase
|
from xtendr.xtendrbase import XtendRBase
|
||||||
|
from xtendr import signing as xsign
|
||||||
|
|
||||||
|
__version__ = "0.5.0"
|
||||||
|
|
||||||
|
logger = logging.getLogger("xtendr")
|
||||||
|
|
||||||
|
# Plugin (folder) names and module names are restricted to a safe identifier
|
||||||
|
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
|
||||||
|
# that have no business in a plugin name.
|
||||||
|
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
|
||||||
|
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
||||||
|
|
||||||
|
# Signature status values stored in plugins[name]["signature"]["status"].
|
||||||
|
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
|
||||||
|
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
|
||||||
|
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
|
||||||
|
|
||||||
__version__ = "0.1.0"
|
|
||||||
|
|
||||||
class XtendRSystem:
|
class XtendRSystem:
|
||||||
"""Plugin system to manage plugins.
|
"""Plugin system to manage plugins.
|
||||||
|
|
||||||
|
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
|
||||||
|
privileges of the host process. Only attach plugins from sources you
|
||||||
|
trust. This class validates names/paths and isolates module loading, but
|
||||||
|
it cannot make untrusted plugin code safe to run.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
>>> system = XtendRSystem()
|
>>> system = XtendRSystem()
|
||||||
>>> system.version()
|
>>> system.version()
|
||||||
XtendR v0.1.0
|
XtendR v0.5.0
|
||||||
>>> system.attach("example_plugin") # Assuming 'example_plugin/example_plugin.json' exists
|
>>> system.attach("example_plugin", lambda: None)
|
||||||
>>> system.run("example_plugin")
|
>>> system.run("example_plugin")
|
||||||
ExamplePlugin is running!
|
ExamplePlugin is running!
|
||||||
>>> system.stop("example_plugin")
|
>>> system.stop("example_plugin")
|
||||||
@@ -21,74 +46,254 @@ class XtendRSystem:
|
|||||||
>>> system.detach("example_plugin")
|
>>> system.detach("example_plugin")
|
||||||
Detached plugin 'example_plugin'.
|
Detached plugin 'example_plugin'.
|
||||||
"""
|
"""
|
||||||
def __init__(self):
|
|
||||||
|
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
|
||||||
|
self.pluginspath = pluginpath
|
||||||
self.plugins = {}
|
self.plugins = {}
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
|
||||||
|
# -- signature verification setup ---------------------------------
|
||||||
|
# If either the public key or the whitelist can't be loaded, we
|
||||||
|
# fail closed: self._public_key / self._whitelist stay None, and
|
||||||
|
# every plugin will come back as SIG_UNSIGNED (disabled) rather
|
||||||
|
# than silently skipping verification. This is deliberate -- an
|
||||||
|
# admin who wants unsigned plugins to run should not be able to
|
||||||
|
# get there by accident (e.g. a missing/misspelled key path).
|
||||||
|
self._public_key = None
|
||||||
|
self._whitelist = None
|
||||||
|
|
||||||
|
if public_key_path is not None:
|
||||||
|
try:
|
||||||
|
self._public_key = xsign.load_public_key(Path(public_key_path))
|
||||||
|
except (OSError, ValueError) as e:
|
||||||
|
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
|
||||||
|
|
||||||
|
if whitelist_path is not None:
|
||||||
|
try:
|
||||||
|
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
|
||||||
|
except (OSError, ValueError) as e:
|
||||||
|
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
|
||||||
|
|
||||||
|
if self._public_key is None or self._whitelist is None:
|
||||||
|
logger.warning(
|
||||||
|
"Signature verification is not fully configured for pluginpath '%s'; "
|
||||||
|
"all plugins will be treated as unsigned and permanently disabled.",
|
||||||
|
pluginpath,
|
||||||
|
)
|
||||||
|
|
||||||
def version(self) -> str:
|
def version(self) -> str:
|
||||||
return "XtendR v" + __version__
|
return "XtendR v" + __version__
|
||||||
|
|
||||||
def attach(self, name: str) -> None:
|
@property
|
||||||
"""Dynamically load a plugin from its folder."""
|
def verification_configured(self) -> bool:
|
||||||
if name in self.plugins:
|
"""True if a public key and whitelist both loaded successfully."""
|
||||||
print(f"Plugin '{name}' is already attached.")
|
return self._public_key is not None and self._whitelist is not None
|
||||||
return
|
|
||||||
|
def _validate_name(self, name: str) -> bool:
|
||||||
plugin_path = os.path.join(os.getcwd(), "plugins", name)
|
if not isinstance(name, str) or not _NAME_RE.match(name):
|
||||||
info_path = os.path.join(plugin_path, name + ".json")
|
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
|
||||||
print(plugin_path + "\n" + info_path)
|
return False
|
||||||
|
return True
|
||||||
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
|
|
||||||
print(f"Failed to attach plugin '{name}', folder or info file not found.")
|
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
|
||||||
return
|
"""Check a plugin's signature against the loaded whitelist.
|
||||||
|
|
||||||
|
Returns a dict with at least a "status" key (SIG_VERIFIED /
|
||||||
|
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
|
||||||
|
available, for display in the UI. Never raises.
|
||||||
|
"""
|
||||||
|
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
|
||||||
|
|
||||||
|
if self._public_key is None or self._whitelist is None:
|
||||||
|
return result
|
||||||
|
|
||||||
|
entry = self._whitelist.entries.get(name)
|
||||||
|
if entry is None:
|
||||||
|
return result
|
||||||
|
|
||||||
|
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(info_path, "r", encoding="utf-8") as f:
|
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
|
||||||
plugin_info = json.load(f)
|
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
|
||||||
module_name = plugin_info.get("module")
|
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
|
||||||
class_name = plugin_info.get("class")
|
ok = False
|
||||||
if not module_name or not class_name:
|
|
||||||
print(f"Plugin '{name}' info file is missing 'module' or 'class' key.")
|
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
|
||||||
return
|
return result
|
||||||
|
|
||||||
sys.path.insert(0, plugin_path)
|
def _check_permissions(self, path: str) -> None:
|
||||||
module = importlib.import_module(module_name)
|
"""Warn (don't block) if a plugin file is group/world-writable."""
|
||||||
|
try:
|
||||||
|
st = os.stat(path)
|
||||||
|
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
|
||||||
|
logger.warning(
|
||||||
|
"Plugin file '%s' is group/world-writable; this is a "
|
||||||
|
"security risk on shared systems.",
|
||||||
|
path,
|
||||||
|
)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def attach(self, name: str, callback=None) -> None:
|
||||||
|
"""Dynamically load a plugin from its folder."""
|
||||||
|
with self._lock:
|
||||||
|
if name in self.plugins:
|
||||||
|
logger.info("Plugin '%s' is already attached.", name)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._validate_name(name):
|
||||||
|
return
|
||||||
|
|
||||||
|
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
|
||||||
|
info_path = os.path.join(plugin_path, name + ".json")
|
||||||
|
|
||||||
|
# Defense in depth: even with a validated name, make sure the
|
||||||
|
# resolved path is actually inside the plugins directory.
|
||||||
|
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
|
||||||
|
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
|
||||||
|
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
|
||||||
|
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._check_permissions(info_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(info_path, "r", encoding="utf-8") as f:
|
||||||
|
plugin_info = json.load(f)
|
||||||
|
except (OSError, json.JSONDecodeError) as e:
|
||||||
|
logger.error("Failed to read info file for plugin '%s': %s", name, e)
|
||||||
|
return
|
||||||
|
|
||||||
|
module_name = plugin_info.get("module")
|
||||||
|
class_name = plugin_info.get("class")
|
||||||
|
if not module_name or not class_name:
|
||||||
|
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
|
||||||
|
return
|
||||||
|
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
|
||||||
|
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
|
||||||
|
return
|
||||||
|
|
||||||
|
module_file = os.path.join(plugin_path, module_name + ".py")
|
||||||
|
if not os.path.isfile(module_file):
|
||||||
|
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._check_permissions(module_file)
|
||||||
|
|
||||||
|
# Verify the plugin's signature BEFORE we ever execute its code.
|
||||||
|
# Unsigned/tampered plugins still get a listing entry (built
|
||||||
|
# from the manifest alone, which is inert JSON) but their .py
|
||||||
|
# file is never imported, and they can never be run.
|
||||||
|
sig = self._verify_signature(name, plugin_path, module_name)
|
||||||
|
if sig["status"] != SIG_VERIFIED:
|
||||||
|
if sig["status"] == SIG_INVALID:
|
||||||
|
logger.error(
|
||||||
|
"Plugin '%s' failed signature verification (tampered or bad "
|
||||||
|
"signature); attaching as permanently disabled.", name,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Plugin '%s' has no valid whitelist entry; attaching as "
|
||||||
|
"permanently disabled.", name,
|
||||||
|
)
|
||||||
|
self.plugins[name] = {
|
||||||
|
"instance": None,
|
||||||
|
"running": False,
|
||||||
|
"info": plugin_info,
|
||||||
|
"autorun": False,
|
||||||
|
"module_key": None,
|
||||||
|
"disabled": True,
|
||||||
|
"signature": sig,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
# Load the module directly from its file path instead of
|
||||||
|
# mutating sys.path. This prevents a plugin from shadowing
|
||||||
|
# stdlib or third-party modules for the rest of the process.
|
||||||
|
qualified_name = f"xtendr_plugin_{name}_{module_name}"
|
||||||
|
try:
|
||||||
|
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
|
||||||
|
if spec is None or spec.loader is None:
|
||||||
|
raise ImportError(f"Could not create import spec for '{module_file}'")
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules[qualified_name] = module
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
|
||||||
plugin_class = getattr(module, class_name)
|
plugin_class = getattr(module, class_name)
|
||||||
instance = plugin_class()
|
instance = plugin_class()
|
||||||
|
|
||||||
if not isinstance(instance, XtendRBase):
|
if not isinstance(instance, XtendRBase):
|
||||||
print(f"Plugin '{name}' does not inherit from PluginBase.")
|
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
|
||||||
|
sys.modules.pop(qualified_name, None)
|
||||||
return
|
return
|
||||||
|
|
||||||
self.plugins[name] = {
|
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
|
||||||
'instance': instance,
|
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
|
||||||
'running': False,
|
sys.modules.pop(qualified_name, None)
|
||||||
'info': plugin_info,
|
return
|
||||||
'autorun': False
|
|
||||||
}
|
self.plugins[name] = {
|
||||||
print(f"Attached plugin '{name}'.")
|
"instance": instance,
|
||||||
except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e:
|
"running": False,
|
||||||
print(f"Failed to attach plugin '{name}': {e}")
|
"info": plugin_info,
|
||||||
|
"autorun": False,
|
||||||
|
"module_key": qualified_name,
|
||||||
|
"disabled": False,
|
||||||
|
"signature": sig,
|
||||||
|
}
|
||||||
|
logger.info("Attached plugin '%s'.", name)
|
||||||
|
logger.info("Running pre-load on '%s'.", name)
|
||||||
|
|
||||||
|
def _pre_load_worker():
|
||||||
|
try:
|
||||||
|
instance.pre_load(callback)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
|
||||||
|
|
||||||
|
thread = threading.Thread(target=_pre_load_worker, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
def run(self, name: str, *args, **kwargs):
|
def run(self, name: str, *args, **kwargs):
|
||||||
"""Run the plugin's 'run' method if available."""
|
"""Run the plugin's 'run' method if available."""
|
||||||
if name in self.plugins:
|
with self._lock:
|
||||||
self.plugins[name]['running'] = True
|
entry = self.plugins.get(name)
|
||||||
return self.plugins[name]['instance'].run(*args, **kwargs)
|
if entry is None:
|
||||||
print(f"Plugin '{name}' not found or has no 'run' method.")
|
logger.error("Plugin '%s' not found or has no 'run' method.", name)
|
||||||
|
return
|
||||||
|
if entry.get("disabled"):
|
||||||
|
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
|
||||||
|
return
|
||||||
|
entry["running"] = True
|
||||||
|
try:
|
||||||
|
return entry["instance"].run(*args, **kwargs)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
|
||||||
|
entry["running"] = False
|
||||||
|
|
||||||
def stop(self, name: str) -> None:
|
def stop(self, name: str) -> None:
|
||||||
"""Stop the plugin if it's running."""
|
"""Stop the plugin if it's running."""
|
||||||
if name in self.plugins and self.plugins[name]['running']:
|
with self._lock:
|
||||||
self.plugins[name]['running'] = False
|
entry = self.plugins.get(name)
|
||||||
self.plugins[name]['instance'].stop()
|
if entry is None or not entry["running"]:
|
||||||
else:
|
logger.info("Plugin '%s' is not running.", name)
|
||||||
print(f"Plugin '{name}' is not running.")
|
return
|
||||||
|
entry["running"] = False
|
||||||
|
try:
|
||||||
|
entry["instance"].stop()
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
|
||||||
|
|
||||||
def detach(self, name: str) -> None:
|
def detach(self, name: str) -> None:
|
||||||
"""Unload a plugin."""
|
"""Unload a plugin."""
|
||||||
if name in self.plugins:
|
with self._lock:
|
||||||
del self.plugins[name]
|
entry = self.plugins.pop(name, None)
|
||||||
sys.modules.pop(name, None)
|
if entry is None:
|
||||||
print(f"Detached plugin '{name}'.")
|
logger.info("Plugin '%s' is not attached.", name)
|
||||||
else:
|
return
|
||||||
print(f"Plugin '{name}' is not attached.")
|
if entry.get("module_key"):
|
||||||
|
sys.modules.pop(entry["module_key"], None)
|
||||||
|
logger.info("Detached plugin '%s'.", name)
|
||||||
|
|||||||
Reference in New Issue
Block a user