59 Commits

Author SHA1 Message Date
Lerking ad67becd91 Verification configured. /JL 2026-07-03 09:00:13 +02:00
Lerking a4fe3a2b80 Gated signatures. /JL 2026-07-03 08:51:39 +02:00
Lerking 984458d8f3 Hardened XtendR. /JL 2026-07-03 08:39:45 +02:00
Lerking be1af3ad05 Merge pull request 'Updated project description. /JL' (#35) from 0.3.3 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/35
2025-03-29 13:59:42 +00:00
Lerking 0c8108d4d8 Updated project description. /JL 2025-03-29 14:58:49 +01:00
Lerking bcc9b864d1 Merge pull request 'README updated. /JL' (#34) from update_readme into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/34
2025-03-28 17:29:35 +00:00
Lerking a4c552d6f5 README updated. /JL 2025-03-28 18:28:58 +01:00
Lerking 34c7927601 Merge pull request '0.3.0' (#33) from 0.3.0 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/33
2025-03-28 17:23:38 +00:00
Lerking ebf31756b5 #31 Done. /JL 2025-03-28 18:22:16 +01:00
Lerking 406c1b77ed #31 - Creating thread with callback, on pre_load(). /JL 2025-03-28 17:26:58 +01:00
Lerking 1be429c5cd Update xtendr/xtendrsystem.py 2025-03-27 20:33:19 +00:00
Lerking a093b3f56c Update plugins/example_plugin/example_plugin.py 2025-03-27 20:31:12 +00:00
Lerking 1dc0c81d84 Update plugins/example_plugin/example_plugin.py 2025-03-27 20:30:23 +00:00
Lerking 3e38cabf81 Update xtendr/xtendrsystem.py 2025-03-27 20:25:12 +00:00
Lerking 63f649af25 #31 Added **self.use_pre_load:bool = False** and pre_load() method. 2025-03-27 19:30:41 +00:00
Lerking b6efe1b1df Update xtendr/xtendrbase.py
#31 Added pre_load() method
2025-03-27 19:23:59 +00:00
Lerking c5ee073d65 Update setup.py 2025-03-27 19:20:30 +00:00
Lerking 3bcf01dc26 Merge pull request '0.2.1 Updated to include keyword arguments in run() method. /JL' (#30) from 0.2.1 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/30
2025-03-27 09:47:53 +00:00
Jan Lerking dd5904c273 0.2.1 Updated to include keyword arguments in run() method. /JL 2025-03-27 10:46:05 +01:00
Lerking b763308aa9 Merge pull request 'Added plugin.run() with arguments. /JL' (#29) from 0.2.0 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/29
2025-03-27 08:57:26 +00:00
Jan Lerking 34ba757bc6 Added plugin.run() with arguments. /JL 2025-03-27 09:35:29 +01:00
Lerking b5d2e1b0be Merge pull request '0.1.3 #26 bug-fix corrected variable name. /JL' (#28) from 0.1.3 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/28
2025-03-09 14:28:53 +00:00
Lerking d3031952bf 0.1.3 #26 bug-fix corrected variable name. /JL 2025-03-09 15:21:08 +01:00
Lerking 3312f2da28 Merge pull request '0.1.2 #26 Added possibility to choose your own plugins folder. /JL' (#27) from 0.1.2 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/27
2025-03-09 14:14:37 +00:00
Lerking a21122671d 0.1.2 #26 Added possibility to choose your own plugins folder. /JL 2025-03-09 15:06:32 +01:00
Lerking 9effa4be3e Merge pull request '0.1.1 Updated doctest. /JL' (#25) from 0.1.1 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/25
2025-03-08 17:34:52 +00:00
Lerking 217038b863 0.1.1 Updated doctest. /JL 2025-03-08 18:34:04 +01:00
Lerking 8c19c4a97b Merge pull request '0.1.0 #23 Added 'Autorun' flag to XtendRSystem Attach() method. Also added a new version() method. /JL' (#24) from 0.1.0 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/24
2025-03-08 17:28:27 +00:00
Lerking c7662a5d47 0.1.0 #23 Added 'Autorun' flag to XtendRSystem Attach() method. Also added a new version() method. /JL 2025-03-08 18:20:34 +01:00
Lerking 68d3aa3ed2 Merge pull request '0.0.9 build. /JL' (#22) from 0.0.9 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/22
2025-03-01 20:14:52 +00:00
Lerking 2ea29fe5ea 0.0.9 build. /JL 2025-03-01 21:14:15 +01:00
Lerking 1195d399b6 Merge pull request '0.0.8 update. /JL' (#21) from 0.0.8 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/21
2025-03-01 12:19:32 +00:00
Lerking b709e4d942 0.0.8 update. /JL 2025-03-01 13:18:10 +01:00
Lerking 96bb0f0f3d Merge pull request '0.0.7' (#20) from 0.0.7 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/20
2025-02-25 06:28:11 +00:00
Lerking 993617976f Add Makefile 2025-02-25 06:27:47 +00:00
Lerking 828213912e Update setup.py 2025-02-25 06:26:29 +00:00
Lerking 44136c33b8 Merge pull request 'Update setup.py' (#17) from lerking-patch-1 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/17
2025-02-25 06:22:07 +00:00
Lerking 490423e94a Update setup.py 2025-02-25 06:21:51 +00:00
Lerking fa2c25d6c5 Merge pull request 'Update setup.py' (#15) from lerking-patch-1 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/15
2025-02-24 16:08:07 +00:00
Lerking 0db2e1dfca Update setup.py 2025-02-24 16:07:16 +00:00
Lerking 7200cb0bf8 Merge pull request 'Updated. /JL' (#14) from 0.0.6 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/14
2025-02-24 11:19:02 +00:00
Jan Lerking da3d09d3b1 Updated. /JL 2025-02-24 12:17:07 +01:00
Lerking 00173e73ae Merge pull request '0.0.5' (#12) from 0.0.5 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/12
2025-02-24 08:23:28 +00:00
Jan Lerking c849d32f73 0.0.5 #8 Cleaned up unused imports. /JL 2025-02-24 09:20:08 +01:00
Lerking ffc7baa394 Update README.md 2025-02-24 08:16:07 +00:00
Lerking ea1ba5d5ab Update README.md 2025-02-24 08:15:18 +00:00
Lerking b01a9cca3e Update README.md 2025-02-24 07:29:42 +00:00
Lerking 485d7a28d8 Merge pull request 'main' (#11) from main into 0.0.5
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/11
2025-02-24 07:28:21 +00:00
Lerking f711cdcf06 Update README.md 2025-02-24 07:26:47 +00:00
Lerking 2199d18c79 Update README.md 2025-02-24 07:26:27 +00:00
Lerking d22861622b Merge pull request 'main' (#10) from main into 0.0.5
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/10
2025-02-24 07:24:30 +00:00
Lerking 13858e433f Update README.md 2025-02-24 07:23:58 +00:00
Lerking 37cad62835 Update README.md 2025-02-24 07:19:43 +00:00
Lerking b4ad8fee0c Update README.md 2025-02-24 07:19:01 +00:00
Lerking 4e51fd06f0 Update README.md 2025-02-24 07:17:57 +00:00
Lerking dd780864a6 Update README.md 2025-02-24 07:03:53 +00:00
Lerking 83db451e1b Update README.md 2025-02-24 07:03:28 +00:00
Lerking b58be97d1f Update setup.py 2025-02-24 06:53:16 +00:00
Lerking eb03302323 Update xtendr/xtendrsystem.py 2025-02-24 06:16:04 +00:00
9 changed files with 616 additions and 76 deletions
+13
View File
@@ -0,0 +1,13 @@
dist:
python -m build
test:
twine upload --repository testpypi dist/*
publish:
twine upload --repository pypi dist/*
all:
$(dist)
$(test)
$(publish)
+18 -3
View File
@@ -1,9 +1,24 @@
# XtendR # XtendR
![Latest Version](https://gitpot.lerk.ing/badges/badge/static?label=Latest+version&message=0.5.1&color=blue)
A very basic python 3 extension system to ease the use of plugins. A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
I was in need of a new plugin system, which should meet these requirements:
:heavy_plus_sign: Simple to use
:heavy_plus_sign: Work well with Python 3.12
:heavy_plus_sign: Maintainable - Don't expect to see new releases every month. __If it ain't broken, don't fix it!!!__
I previously used yapsy, but it doesn't meet the requirements anymore.
:x: No longer simple, and simple to use (Simplicity in use has been sacrificed for more complexity. It has become bloated)
:x: Not workink with Python 3.12
:x: No longer maintained (Hasn't been maintained for a few years)
I didn't find anything that suited my needs, so I decided to make my own plugin system.
It simply contains 2 classes, one for the plugin system and one abstraction base class for the plugins themselves.
At the moment only 4 functions are available: At the moment only 4 functions are available:
- Attach
- Attach - including call to pre-load data in plugin.
- Run - Run
- Stop - Stop
- Detach - Detach
@@ -14,4 +29,4 @@ The Run and Stop functions are mandatory in the plugin modules.
The system expects a folder called 'plugins', placed at the root, along side your main python file. The system expects a folder called 'plugins', placed at the root, along side your main python file.
Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder. Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder.
The example.py along with the example_plugin.py/json shows the workings of this plugin system. The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
+13 -2
View File
@@ -1,5 +1,9 @@
import time
from xtendr.xtendrsystem import XtendRSystem from xtendr.xtendrsystem import XtendRSystem
def my_callback():
print("'example_plugin is finished pre-loading")
if __name__ == "__main__": if __name__ == "__main__":
"""Example usage of the PluginSystem. """Example usage of the PluginSystem.
@@ -15,7 +19,14 @@ if __name__ == "__main__":
Detached plugin 'example_plugin'. Detached plugin 'example_plugin'.
""" """
system = XtendRSystem() system = XtendRSystem()
system.attach("example_plugin") # Assuming 'example_plugin/plugin_info.json' exists system.attach("example_plugin", my_callback) # Assuming 'example_plugin/plugin_info.json' exists
system.run("example_plugin") for i in range(3):
print(f"Main program is running iteration {i+1}...")
time.sleep(2)
system.run("example_plugin", test="Hello!")
system.stop("example_plugin")
system.run("example_plugin", 25)
system.stop("example_plugin")
system.run("example_plugin", "Hello!", 25)
system.stop("example_plugin") system.stop("example_plugin")
system.detach("example_plugin") system.detach("example_plugin")
+24 -2
View File
@@ -1,3 +1,5 @@
import threading
import time
from xtendr.xtendrbase import XtendRBase from xtendr.xtendrbase import XtendRBase
class ExamplePlugin(XtendRBase): class ExamplePlugin(XtendRBase):
@@ -5,13 +7,33 @@ class ExamplePlugin(XtendRBase):
Example: Example:
>>> plugin = ExamplePlugin() >>> plugin = ExamplePlugin()
>>> plugin.run() >>> plugin.run("Hello!", 25)
Passed arguments 2:
Argument 0: Hello!
Argument 1: 25
ExamplePlugin is running! ExamplePlugin is running!
>>> plugin.stop() >>> plugin.stop()
ExamplePlugin has stopped! ExamplePlugin has stopped!
""" """
def run(self): def run(self, *args, **kwargs):
arglen = len(args)
keylen = len(kwargs)
if arglen > 0:
print(f"Passed arguments {arglen}:")
for idx, a in enumerate(args):
print(f"Argument {idx}: {a}")
if keylen > 0:
print(f"Keyword arguments passed {keylen}")
if not "test" in kwargs:
raise ValueError("Didn't get expected 'test' keyword!")
for kw in kwargs:
print(f"Argument {kw}: {kwargs[kw]}")
print("ExamplePlugin is running!") print("ExamplePlugin is running!")
def stop(self): def stop(self):
print("ExamplePlugin has stopped!") print("ExamplePlugin has stopped!")
def pre_load(self, callback):
time.sleep(5) # Indicate long running pre-load.
callback()
+2
View File
@@ -0,0 +1,2 @@
setuptools==68.2.2
cryptography>=42
+10 -5
View File
@@ -1,19 +1,24 @@
if __name__ == "__main__": if __name__ == "__main__":
from setuptools import setup, find_packages from setuptools import setup, find_packages
from pathlib import Path
this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()
setup( setup(
name="XtendR", name="XtendR",
version="0.0.4", version="0.5.1",
packages=find_packages(), packages=find_packages(),
install_requires=[], install_requires=[],
author="Jan Lerking", author="Jan Lerking",
author_email="", author_email="",
description="A modular plugin system for Python.", description="A modular plugin system for Python.",
url="www.gitea.com/Lerking/XtendR", url="https://www.gitea.com/Lerking/XtendR",
classifiers=[ classifiers=[
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Operating System :: OS Independent", "Operating System :: OS Independent",
], ],
python_requires='>=3.12', python_requires='>=3.11',
long_description=long_description,
long_description_content_type='text/markdown'
) )
+259
View File
@@ -0,0 +1,259 @@
"""
xtendr_signing.py
Core logic for signing XtendR plugins and maintaining a whitelist.
Deliberately kept free of any GTK/Adw imports so it can be unit tested
and reused by both the signer GUI and (later) XtendRSystem's verification
hook.
Design:
- Ed25519 keypair: private key stays with the signer, public key ships
inside XtendR / PyPac to verify at attach() time.
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
module file (<module>.py), hashed together with SHA-256. The signature
covers the whole entry (hash + name + module + class), so an attacker
can't splice a valid hash onto a different plugin identity.
- The whitelist file itself can optionally be encrypted at rest with a
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
for confidentiality only -- the signature is what provides integrity,
and still verifies correctly whether or not the file on disk is
encrypted.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
PBKDF2_ITERATIONS = 600_000
# --------------------------------------------------------------------------
# Key management
# --------------------------------------------------------------------------
def generate_keypair(private_path: Path, public_path: Path) -> None:
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
offline machine, backups). This is deliberately not done automatically
since the right answer depends on the user's environment.
"""
private_path.parent.mkdir(parents=True, exist_ok=True)
key = Ed25519PrivateKey.generate()
priv_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pub_bytes = key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
private_path.write_bytes(priv_bytes)
try:
os.chmod(private_path, 0o600)
except OSError:
pass
public_path.write_bytes(pub_bytes)
def load_private_key(path: Path) -> Ed25519PrivateKey:
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
if not isinstance(key, Ed25519PrivateKey):
raise ValueError(f"'{path}' is not an Ed25519 private key.")
return key
def load_public_key(path: Path) -> Ed25519PublicKey:
key = serialization.load_pem_public_key(path.read_bytes())
if not isinstance(key, Ed25519PublicKey):
raise ValueError(f"'{path}' is not an Ed25519 public key.")
return key
# --------------------------------------------------------------------------
# Plugin hashing + entry signing
# --------------------------------------------------------------------------
@dataclass
class PluginEntry:
name: str # plugin folder name
module: str # module name, from the manifest
cls: str # class name, from the manifest
sha256: str # hash of manifest + module file contents
signed_at: str # ISO-8601 UTC timestamp
signature: str = "" # base64 Ed25519 signature, filled in after signing
def canonical_bytes(self) -> bytes:
"""Bytes that get signed / verified -- everything except the
signature itself, in a stable, sorted-key JSON encoding."""
payload = {
"name": self.name,
"module": self.module,
"cls": self.cls,
"sha256": self.sha256,
"signed_at": self.signed_at,
}
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
"""Hash the manifest + module file together, in a fixed order, so the
hash changes if either file is tampered with."""
manifest_path = plugin_dir / f"{name}.json"
module_path = plugin_dir / f"{module}.py"
if not manifest_path.is_file():
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
if not module_path.is_file():
raise FileNotFoundError(f"Module file not found: {module_path}")
h = hashlib.sha256()
for p in (manifest_path, module_path):
h.update(p.name.encode("utf-8"))
h.update(b"\x00")
h.update(p.read_bytes())
h.update(b"\x00")
return h.hexdigest()
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
manifest_path_glob = list(plugin_dir.glob("*.json"))
if len(manifest_path_glob) != 1:
raise ValueError(
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
)
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
module = manifest.get("module")
cls = manifest.get("class")
if not module or not cls:
raise ValueError("Manifest is missing 'module' or 'class'.")
name = plugin_dir.name
digest = hash_plugin_files(plugin_dir, name, module)
entry = PluginEntry(
name=name,
module=module,
cls=cls,
sha256=digest,
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
signature = private_key.sign(entry.canonical_bytes())
entry.signature = base64.b64encode(signature).decode("ascii")
return entry
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
try:
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
return True
except (InvalidSignature, ValueError):
return False
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
"""Full verification: signature is valid AND the files on disk still
match the hash that was signed."""
if not verify_entry(public_key, entry):
return False
try:
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
except FileNotFoundError:
return False
return current_hash == entry.sha256
# --------------------------------------------------------------------------
# Whitelist storage (optionally encrypted at rest)
# --------------------------------------------------------------------------
class Whitelist:
def __init__(self):
self.entries: dict[str, PluginEntry] = {}
def add(self, entry: PluginEntry) -> None:
self.entries[entry.name] = entry
def remove(self, name: str) -> bool:
return self.entries.pop(name, None) is not None
def to_json(self) -> str:
payload = {name: asdict(e) for name, e in self.entries.items()}
return json.dumps(payload, indent=2, sort_keys=True)
@classmethod
def from_json(cls, data: str) -> "Whitelist":
wl = cls()
raw = json.loads(data)
for name, fields in raw.items():
wl.entries[name] = PluginEntry(**fields)
return wl
# -- disk I/O -----------------------------------------------------
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
plaintext = self.to_json().encode("utf-8")
if passphrase is None:
path.write_bytes(plaintext)
return
salt = os.urandom(16)
key = _derive_key(passphrase, salt)
token = Fernet(key).encrypt(plaintext)
container = {
"encrypted": True,
"kdf": "pbkdf2-sha256",
"iterations": PBKDF2_ITERATIONS,
"salt": base64.b64encode(salt).decode("ascii"),
"data": base64.b64encode(token).decode("ascii"),
}
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
@classmethod
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
raw = path.read_text(encoding="utf-8")
try:
container = json.loads(raw)
except json.JSONDecodeError:
container = None
if isinstance(container, dict) and container.get("encrypted"):
if passphrase is None:
raise ValueError("Whitelist is encrypted; a passphrase is required.")
salt = base64.b64decode(container["salt"])
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
token = base64.b64decode(container["data"])
try:
plaintext = Fernet(key).decrypt(token).decode("utf-8")
except InvalidToken as e:
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
return cls.from_json(plaintext)
return cls.from_json(raw)
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=iterations,
)
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
+6 -5
View File
@@ -1,7 +1,3 @@
import importlib
import sys
import os
import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
class XtendRBase(ABC): class XtendRBase(ABC):
@@ -21,9 +17,14 @@ class XtendRBase(ABC):
Stopping TestPlugin Stopping TestPlugin
""" """
@abstractmethod @abstractmethod
def run(self): def run(self, *args, **kwargs):
pass pass
@abstractmethod @abstractmethod
def stop(self): def stop(self):
pass pass
@abstractmethod
def pre_load(self, *args):
pass
+271 -59
View File
@@ -1,16 +1,44 @@
import importlib import importlib.util
import sys import sys
import os import os
import re
import json import json
from abc import ABC, abstractmethod import stat
import logging
import threading
from pathlib import Path
from xtendr.xtendrbase import XtendRBase from xtendr.xtendrbase import XtendRBase
from xtendr import signing as xsign
__version__ = "0.5.0"
logger = logging.getLogger("xtendr")
# Plugin (folder) names and module names are restricted to a safe identifier
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
# that have no business in a plugin name.
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# Signature status values stored in plugins[name]["signature"]["status"].
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
class XtendRSystem: class XtendRSystem:
"""Plugin system to manage plugins. """Plugin system to manage plugins.
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
privileges of the host process. Only attach plugins from sources you
trust. This class validates names/paths and isolates module loading, but
it cannot make untrusted plugin code safe to run.
Example: Example:
>>> system = XtendRSystem() >>> system = XtendRSystem()
>>> system.attach("example_plugin") # Assuming 'example_plugin/plugin_info.json' exists >>> system.version()
XtendR v0.5.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin") >>> system.run("example_plugin")
ExamplePlugin is running! ExamplePlugin is running!
>>> system.stop("example_plugin") >>> system.stop("example_plugin")
@@ -18,70 +46,254 @@ class XtendRSystem:
>>> system.detach("example_plugin") >>> system.detach("example_plugin")
Detached plugin 'example_plugin'. Detached plugin 'example_plugin'.
""" """
def __init__(self):
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
self.pluginspath = pluginpath
self.plugins = {} self.plugins = {}
self._lock = threading.RLock()
def attach(self, name: str) -> None:
"""Dynamically load a plugin from its folder.""" # -- signature verification setup ---------------------------------
if name in self.plugins: # If either the public key or the whitelist can't be loaded, we
print(f"Plugin '{name}' is already attached.") # fail closed: self._public_key / self._whitelist stay None, and
return # every plugin will come back as SIG_UNSIGNED (disabled) rather
# than silently skipping verification. This is deliberate -- an
plugin_path = os.path.join(os.getcwd(), "plugins", name) # admin who wants unsigned plugins to run should not be able to
info_path = os.path.join(plugin_path, name + ".json") # get there by accident (e.g. a missing/misspelled key path).
print(plugin_path + "\n" + info_path) self._public_key = None
self._whitelist = None
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
print(f"Failed to attach plugin '{name}', folder or info file not found.") if public_key_path is not None:
return try:
self._public_key = xsign.load_public_key(Path(public_key_path))
except (OSError, ValueError) as e:
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
if whitelist_path is not None:
try:
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
except (OSError, ValueError) as e:
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
if self._public_key is None or self._whitelist is None:
logger.warning(
"Signature verification is not fully configured for pluginpath '%s'; "
"all plugins will be treated as unsigned and permanently disabled.",
pluginpath,
)
def version(self) -> str:
return "XtendR v" + __version__
@property
def verification_configured(self) -> bool:
"""True if a public key and whitelist both loaded successfully."""
return self._public_key is not None and self._whitelist is not None
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
"""Check a plugin's signature against the loaded whitelist.
Returns a dict with at least a "status" key (SIG_VERIFIED /
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
available, for display in the UI. Never raises.
"""
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
if self._public_key is None or self._whitelist is None:
return result
entry = self._whitelist.entries.get(name)
if entry is None:
return result
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
try: try:
with open(info_path, "r", encoding="utf-8") as f: ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
plugin_info = json.load(f) except Exception: # noqa: BLE001 - never let a verification bug crash attach()
module_name = plugin_info.get("module") logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
class_name = plugin_info.get("class") ok = False
if not module_name or not class_name:
print(f"Plugin '{name}' info file is missing 'module' or 'class' key.") result["status"] = SIG_VERIFIED if ok else SIG_INVALID
return return result
sys.path.insert(0, plugin_path) def _check_permissions(self, path: str) -> None:
module = importlib.import_module(module_name) """Warn (don't block) if a plugin file is group/world-writable."""
try:
st = os.stat(path)
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
logger.warning(
"Plugin file '%s' is group/world-writable; this is a "
"security risk on shared systems.",
path,
)
except OSError:
pass
def attach(self, name: str, callback=None) -> None:
"""Dynamically load a plugin from its folder."""
with self._lock:
if name in self.plugins:
logger.info("Plugin '%s' is already attached.", name)
return
if not self._validate_name(name):
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
# Defense in depth: even with a validated name, make sure the
# resolved path is actually inside the plugins directory.
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
return
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
return
self._check_permissions(info_path)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to read info file for plugin '%s': %s", name, e)
return
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
return
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
return
module_file = os.path.join(plugin_path, module_name + ".py")
if not os.path.isfile(module_file):
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
return
self._check_permissions(module_file)
# Verify the plugin's signature BEFORE we ever execute its code.
# Unsigned/tampered plugins still get a listing entry (built
# from the manifest alone, which is inert JSON) but their .py
# file is never imported, and they can never be run.
sig = self._verify_signature(name, plugin_path, module_name)
if sig["status"] != SIG_VERIFIED:
if sig["status"] == SIG_INVALID:
logger.error(
"Plugin '%s' failed signature verification (tampered or bad "
"signature); attaching as permanently disabled.", name,
)
else:
logger.warning(
"Plugin '%s' has no valid whitelist entry; attaching as "
"permanently disabled.", name,
)
self.plugins[name] = {
"instance": None,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": None,
"disabled": True,
"signature": sig,
}
return
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
qualified_name = f"xtendr_plugin_{name}_{module_name}"
try:
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create import spec for '{module_file}'")
module = importlib.util.module_from_spec(spec)
sys.modules[qualified_name] = module
spec.loader.exec_module(module)
plugin_class = getattr(module, class_name) plugin_class = getattr(module, class_name)
instance = plugin_class() instance = plugin_class()
if not isinstance(instance, XtendRBase): if not isinstance(instance, XtendRBase):
print(f"Plugin '{name}' does not inherit from PluginBase.") logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
sys.modules.pop(qualified_name, None)
return return
self.plugins[name] = { except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
'instance': instance, logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
'running': False, sys.modules.pop(qualified_name, None)
'info': plugin_info return
}
print(f"Attached plugin '{name}'.") self.plugins[name] = {
except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e: "instance": instance,
print(f"Failed to attach plugin '{name}': {e}") "running": False,
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
"disabled": False,
"signature": sig,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
def _pre_load_worker():
try:
instance.pre_load(callback)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
thread = threading.Thread(target=_pre_load_worker, daemon=True)
thread.start()
def run(self, name: str, *args, **kwargs): def run(self, name: str, *args, **kwargs):
"""Run the plugin's 'run' method if available.""" """Run the plugin's 'run' method if available."""
if name in self.plugins: with self._lock:
self.plugins[name]['running'] = True entry = self.plugins.get(name)
return self.plugins[name]['instance'].run(*args, **kwargs) if entry is None:
print(f"Plugin '{name}' not found or has no 'run' method.") logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
if entry.get("disabled"):
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
entry["running"] = False
def stop(self, name: str) -> None: def stop(self, name: str) -> None:
"""Stop the plugin if it's running.""" """Stop the plugin if it's running."""
if name in self.plugins and self.plugins[name]['running']: with self._lock:
self.plugins[name]['running'] = False entry = self.plugins.get(name)
self.plugins[name]['instance'].stop() if entry is None or not entry["running"]:
else: logger.info("Plugin '%s' is not running.", name)
print(f"Plugin '{name}' is not running.") return
entry["running"] = False
try:
entry["instance"].stop()
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
def detach(self, name: str) -> None: def detach(self, name: str) -> None:
"""Unload a plugin.""" """Unload a plugin."""
if name in self.plugins: with self._lock:
del self.plugins[name] entry = self.plugins.pop(name, None)
sys.modules.pop(name, None) if entry is None:
print(f"Detached plugin '{name}'.") logger.info("Plugin '%s' is not attached.", name)
else: return
print(f"Plugin '{name}' is not attached.") if entry.get("module_key"):
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)