16 Commits

Author SHA1 Message Date
Lerking a4fe3a2b80 Gated signatures. /JL 2026-07-03 08:51:39 +02:00
Lerking 984458d8f3 Hardened XtendR. /JL 2026-07-03 08:39:45 +02:00
Lerking be1af3ad05 Merge pull request 'Updated project description. /JL' (#35) from 0.3.3 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/35
2025-03-29 13:59:42 +00:00
Lerking 0c8108d4d8 Updated project description. /JL 2025-03-29 14:58:49 +01:00
Lerking bcc9b864d1 Merge pull request 'README updated. /JL' (#34) from update_readme into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/34
2025-03-28 17:29:35 +00:00
Lerking a4c552d6f5 README updated. /JL 2025-03-28 18:28:58 +01:00
Lerking 34c7927601 Merge pull request '0.3.0' (#33) from 0.3.0 into main
Reviewed-on: https://gitea.com/Lerking/XtendR/pulls/33
2025-03-28 17:23:38 +00:00
Lerking ebf31756b5 #31 Done. /JL 2025-03-28 18:22:16 +01:00
Lerking 406c1b77ed #31 - Creating thread with callback, on pre_load(). /JL 2025-03-28 17:26:58 +01:00
Lerking 1be429c5cd Update xtendr/xtendrsystem.py 2025-03-27 20:33:19 +00:00
Lerking a093b3f56c Update plugins/example_plugin/example_plugin.py 2025-03-27 20:31:12 +00:00
Lerking 1dc0c81d84 Update plugins/example_plugin/example_plugin.py 2025-03-27 20:30:23 +00:00
Lerking 3e38cabf81 Update xtendr/xtendrsystem.py 2025-03-27 20:25:12 +00:00
Lerking 63f649af25 #31 Added **self.use_pre_load:bool = False** and pre_load() method. 2025-03-27 19:30:41 +00:00
Lerking b6efe1b1df Update xtendr/xtendrbase.py
#31 Added pre_load() method
2025-03-27 19:23:59 +00:00
Lerking c5ee073d65 Update setup.py 2025-03-27 19:20:30 +00:00
8 changed files with 558 additions and 68 deletions
+5 -3
View File
@@ -1,6 +1,7 @@
# XtendR
![Latest Version](https://gitpot.lerk.ing/badges/badge/static?label=Latest+version&message=0.5.0&color=blue)
A very basic Python 3.12 plugin system based on the K.I.S.S principle.
A very basic Python 3.12 friendly plugin system based on the K.I.S.S principle.
I was in need of a new plugin system, which should meet these requirements:
:heavy_plus_sign: Simple to use
@@ -16,7 +17,8 @@ I didn't find anything that suited my needs, so I decided to make my own plugin
It simply contains 2 classes, one for the plugin system and one abstraction base class for the plugins themselves.
At the moment only 4 functions are available:
- Attach
- Attach - including call to pre-load data in plugin.
- Run
- Stop
- Detach
@@ -27,4 +29,4 @@ The Run and Stop functions are mandatory in the plugin modules.
The system expects a folder called 'plugins', placed at the root, along side your main python file.
Each plugin should be placed in subfolders, named as the plugin, inside the 'plugins' folder.
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
The example.py along with the plugins/example_plugin/example_plugin.py and plugins/example_plugin/example_plugin.json shows the workings of this plugin system.
+8 -1
View File
@@ -1,5 +1,9 @@
import time
from xtendr.xtendrsystem import XtendRSystem
def my_callback():
print("'example_plugin is finished pre-loading")
if __name__ == "__main__":
"""Example usage of the PluginSystem.
@@ -15,7 +19,10 @@ if __name__ == "__main__":
Detached plugin 'example_plugin'.
"""
system = XtendRSystem()
system.attach("example_plugin") # Assuming 'example_plugin/plugin_info.json' exists
system.attach("example_plugin", my_callback) # Assuming 'example_plugin/plugin_info.json' exists
for i in range(3):
print(f"Main program is running iteration {i+1}...")
time.sleep(2)
system.run("example_plugin", test="Hello!")
system.stop("example_plugin")
system.run("example_plugin", 25)
+6
View File
@@ -1,3 +1,5 @@
import threading
import time
from xtendr.xtendrbase import XtendRBase
class ExamplePlugin(XtendRBase):
@@ -31,3 +33,7 @@ class ExamplePlugin(XtendRBase):
def stop(self):
print("ExamplePlugin has stopped!")
def pre_load(self, callback):
time.sleep(5) # Indicate long running pre-load.
callback()
+2
View File
@@ -0,0 +1,2 @@
setuptools==68.2.2
cryptography>=42
+7 -2
View File
@@ -1,9 +1,12 @@
if __name__ == "__main__":
from setuptools import setup, find_packages
from pathlib import Path
this_directory = Path(__file__).parent
long_description = (this_directory / "README.md").read_text()
setup(
name="XtendR",
version="0.2.1",
version="0.5.0",
packages=find_packages(),
install_requires=[],
author="Jan Lerking",
@@ -16,4 +19,6 @@ if __name__ == "__main__":
"Operating System :: OS Independent",
],
python_requires='>=3.11',
long_description=long_description,
long_description_content_type='text/markdown'
)
+259
View File
@@ -0,0 +1,259 @@
"""
xtendr_signing.py
Core logic for signing XtendR plugins and maintaining a whitelist.
Deliberately kept free of any GTK/Adw imports so it can be unit tested
and reused by both the signer GUI and (later) XtendRSystem's verification
hook.
Design:
- Ed25519 keypair: private key stays with the signer, public key ships
inside XtendR / PyPac to verify at attach() time.
- Each whitelist entry covers the plugin's manifest (<name>.json) and its
module file (<module>.py), hashed together with SHA-256. The signature
covers the whole entry (hash + name + module + class), so an attacker
can't splice a valid hash onto a different plugin identity.
- The whitelist file itself can optionally be encrypted at rest with a
passphrase (Fernet / AES-128-CBC+HMAC via PBKDF2-derived key). This is
for confidentiality only -- the signature is what provides integrity,
and still verifies correctly whether or not the file on disk is
encrypted.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives import serialization
from cryptography.exceptions import InvalidSignature
from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
PBKDF2_ITERATIONS = 600_000
# --------------------------------------------------------------------------
# Key management
# --------------------------------------------------------------------------
def generate_keypair(private_path: Path, public_path: Path) -> None:
"""Generate a new Ed25519 keypair and write it to disk (unencrypted PEM).
Caller is responsible for keeping private_path safe (e.g. 0600 perms,
offline machine, backups). This is deliberately not done automatically
since the right answer depends on the user's environment.
"""
private_path.parent.mkdir(parents=True, exist_ok=True)
key = Ed25519PrivateKey.generate()
priv_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pub_bytes = key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
private_path.write_bytes(priv_bytes)
try:
os.chmod(private_path, 0o600)
except OSError:
pass
public_path.write_bytes(pub_bytes)
def load_private_key(path: Path) -> Ed25519PrivateKey:
key = serialization.load_pem_private_key(path.read_bytes(), password=None)
if not isinstance(key, Ed25519PrivateKey):
raise ValueError(f"'{path}' is not an Ed25519 private key.")
return key
def load_public_key(path: Path) -> Ed25519PublicKey:
key = serialization.load_pem_public_key(path.read_bytes())
if not isinstance(key, Ed25519PublicKey):
raise ValueError(f"'{path}' is not an Ed25519 public key.")
return key
# --------------------------------------------------------------------------
# Plugin hashing + entry signing
# --------------------------------------------------------------------------
@dataclass
class PluginEntry:
name: str # plugin folder name
module: str # module name, from the manifest
cls: str # class name, from the manifest
sha256: str # hash of manifest + module file contents
signed_at: str # ISO-8601 UTC timestamp
signature: str = "" # base64 Ed25519 signature, filled in after signing
def canonical_bytes(self) -> bytes:
"""Bytes that get signed / verified -- everything except the
signature itself, in a stable, sorted-key JSON encoding."""
payload = {
"name": self.name,
"module": self.module,
"cls": self.cls,
"sha256": self.sha256,
"signed_at": self.signed_at,
}
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
def hash_plugin_files(plugin_dir: Path, name: str, module: str) -> str:
"""Hash the manifest + module file together, in a fixed order, so the
hash changes if either file is tampered with."""
manifest_path = plugin_dir / f"{name}.json"
module_path = plugin_dir / f"{module}.py"
if not manifest_path.is_file():
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
if not module_path.is_file():
raise FileNotFoundError(f"Module file not found: {module_path}")
h = hashlib.sha256()
for p in (manifest_path, module_path):
h.update(p.name.encode("utf-8"))
h.update(b"\x00")
h.update(p.read_bytes())
h.update(b"\x00")
return h.hexdigest()
def sign_plugin(private_key: Ed25519PrivateKey, plugin_dir: Path) -> PluginEntry:
manifest_path_glob = list(plugin_dir.glob("*.json"))
if len(manifest_path_glob) != 1:
raise ValueError(
f"Expected exactly one manifest .json in '{plugin_dir}', found {len(manifest_path_glob)}."
)
manifest = json.loads(manifest_path_glob[0].read_text(encoding="utf-8"))
module = manifest.get("module")
cls = manifest.get("class")
if not module or not cls:
raise ValueError("Manifest is missing 'module' or 'class'.")
name = plugin_dir.name
digest = hash_plugin_files(plugin_dir, name, module)
entry = PluginEntry(
name=name,
module=module,
cls=cls,
sha256=digest,
signed_at=datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
signature = private_key.sign(entry.canonical_bytes())
entry.signature = base64.b64encode(signature).decode("ascii")
return entry
def verify_entry(public_key: Ed25519PublicKey, entry: PluginEntry) -> bool:
try:
public_key.verify(base64.b64decode(entry.signature), entry.canonical_bytes())
return True
except (InvalidSignature, ValueError):
return False
def verify_plugin_on_disk(public_key: Ed25519PublicKey, plugin_dir: Path, entry: PluginEntry) -> bool:
"""Full verification: signature is valid AND the files on disk still
match the hash that was signed."""
if not verify_entry(public_key, entry):
return False
try:
current_hash = hash_plugin_files(plugin_dir, entry.name, entry.module)
except FileNotFoundError:
return False
return current_hash == entry.sha256
# --------------------------------------------------------------------------
# Whitelist storage (optionally encrypted at rest)
# --------------------------------------------------------------------------
class Whitelist:
def __init__(self):
self.entries: dict[str, PluginEntry] = {}
def add(self, entry: PluginEntry) -> None:
self.entries[entry.name] = entry
def remove(self, name: str) -> bool:
return self.entries.pop(name, None) is not None
def to_json(self) -> str:
payload = {name: asdict(e) for name, e in self.entries.items()}
return json.dumps(payload, indent=2, sort_keys=True)
@classmethod
def from_json(cls, data: str) -> "Whitelist":
wl = cls()
raw = json.loads(data)
for name, fields in raw.items():
wl.entries[name] = PluginEntry(**fields)
return wl
# -- disk I/O -----------------------------------------------------
def save(self, path: Path, passphrase: Optional[str] = None) -> None:
plaintext = self.to_json().encode("utf-8")
if passphrase is None:
path.write_bytes(plaintext)
return
salt = os.urandom(16)
key = _derive_key(passphrase, salt)
token = Fernet(key).encrypt(plaintext)
container = {
"encrypted": True,
"kdf": "pbkdf2-sha256",
"iterations": PBKDF2_ITERATIONS,
"salt": base64.b64encode(salt).decode("ascii"),
"data": base64.b64encode(token).decode("ascii"),
}
path.write_text(json.dumps(container, indent=2), encoding="utf-8")
@classmethod
def load(cls, path: Path, passphrase: Optional[str] = None) -> "Whitelist":
raw = path.read_text(encoding="utf-8")
try:
container = json.loads(raw)
except json.JSONDecodeError:
container = None
if isinstance(container, dict) and container.get("encrypted"):
if passphrase is None:
raise ValueError("Whitelist is encrypted; a passphrase is required.")
salt = base64.b64decode(container["salt"])
key = _derive_key(passphrase, salt, container.get("iterations", PBKDF2_ITERATIONS))
token = base64.b64decode(container["data"])
try:
plaintext = Fernet(key).decrypt(token).decode("utf-8")
except InvalidToken as e:
raise ValueError("Wrong passphrase or corrupted whitelist file.") from e
return cls.from_json(plaintext)
return cls.from_json(raw)
def _derive_key(passphrase: str, salt: bytes, iterations: int = PBKDF2_ITERATIONS) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=iterations,
)
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
+5
View File
@@ -23,3 +23,8 @@ class XtendRBase(ABC):
@abstractmethod
def stop(self):
pass
@abstractmethod
def pre_load(self, *args):
pass
+266 -62
View File
@@ -1,19 +1,44 @@
import importlib
import importlib.util
import sys
import os
import re
import json
import stat
import logging
import threading
from pathlib import Path
from xtendr.xtendrbase import XtendRBase
from xtendr import signing as xsign
__version__ = "0.5.0"
logger = logging.getLogger("xtendr")
# Plugin (folder) names and module names are restricted to a safe identifier
# pattern. This blocks path traversal (e.g. "../../etc") and stray characters
# that have no business in a plugin name.
_NAME_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MODULE_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# Signature status values stored in plugins[name]["signature"]["status"].
SIG_VERIFIED = "verified" # whitelist entry present, signature and hash both check out
SIG_UNSIGNED = "unsigned" # no whitelist entry for this plugin at all
SIG_INVALID = "invalid" # whitelist entry present but signature/hash mismatch (tampered)
__version__ = "0.1.3"
class XtendRSystem:
"""Plugin system to manage plugins.
SECURITY NOTE: plugins are arbitrary Python code that runs with the full
privileges of the host process. Only attach plugins from sources you
trust. This class validates names/paths and isolates module loading, but
it cannot make untrusted plugin code safe to run.
Example:
>>> system = XtendRSystem()
>>> system.version()
XtendR v0.1.3
>>> system.attach("example_plugin") # Assuming 'example_plugin/example_plugin.json' exists
XtendR v0.5.0
>>> system.attach("example_plugin", lambda: None)
>>> system.run("example_plugin")
ExamplePlugin is running!
>>> system.stop("example_plugin")
@@ -21,75 +46,254 @@ class XtendRSystem:
>>> system.detach("example_plugin")
Detached plugin 'example_plugin'.
"""
def __init__(self, pluginpath = "plugins"):
def __init__(self, pluginpath="plugins", public_key_path=None, whitelist_path=None, whitelist_passphrase=None):
self.pluginspath = pluginpath
self.plugins = {}
self._lock = threading.RLock()
# -- signature verification setup ---------------------------------
# If either the public key or the whitelist can't be loaded, we
# fail closed: self._public_key / self._whitelist stay None, and
# every plugin will come back as SIG_UNSIGNED (disabled) rather
# than silently skipping verification. This is deliberate -- an
# admin who wants unsigned plugins to run should not be able to
# get there by accident (e.g. a missing/misspelled key path).
self._public_key = None
self._whitelist = None
if public_key_path is not None:
try:
self._public_key = xsign.load_public_key(Path(public_key_path))
except (OSError, ValueError) as e:
logger.error("Could not load XtendR public key from '%s': %s", public_key_path, e)
if whitelist_path is not None:
try:
self._whitelist = xsign.Whitelist.load(Path(whitelist_path), whitelist_passphrase)
except (OSError, ValueError) as e:
logger.error("Could not load XtendR plugin whitelist from '%s': %s", whitelist_path, e)
if self._public_key is None or self._whitelist is None:
logger.warning(
"Signature verification is not fully configured for pluginpath '%s'; "
"all plugins will be treated as unsigned and permanently disabled.",
pluginpath,
)
def version(self) -> str:
return "XtendR v" + __version__
def attach(self, name: str) -> None:
"""Dynamically load a plugin from its folder."""
if name in self.plugins:
print(f"Plugin '{name}' is already attached.")
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
print(plugin_path + "\n" + info_path)
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
print(f"Failed to attach plugin '{name}', folder or info file not found.")
return
@property
def verification_configured(self) -> bool:
"""True if a public key and whitelist both loaded successfully."""
return self._public_key is not None and self._whitelist is not None
def _validate_name(self, name: str) -> bool:
if not isinstance(name, str) or not _NAME_RE.match(name):
logger.error("Rejected plugin name %r: must match %s", name, _NAME_RE.pattern)
return False
return True
def _verify_signature(self, name: str, plugin_path: str, module_name: str) -> dict:
"""Check a plugin's signature against the loaded whitelist.
Returns a dict with at least a "status" key (SIG_VERIFIED /
SIG_UNSIGNED / SIG_INVALID) plus whatever whitelist metadata is
available, for display in the UI. Never raises.
"""
result = {"status": SIG_UNSIGNED, "sha256": None, "signature": None, "signed_at": None}
if self._public_key is None or self._whitelist is None:
return result
entry = self._whitelist.entries.get(name)
if entry is None:
return result
result.update(sha256=entry.sha256, signature=entry.signature, signed_at=entry.signed_at)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
print(f"Plugin '{name}' info file is missing 'module' or 'class' key.")
return
sys.path.insert(0, plugin_path)
module = importlib.import_module(module_name)
ok = xsign.verify_plugin_on_disk(self._public_key, Path(plugin_path), entry)
except Exception: # noqa: BLE001 - never let a verification bug crash attach()
logger.error("Signature verification raised for plugin '%s'.", name, exc_info=True)
ok = False
result["status"] = SIG_VERIFIED if ok else SIG_INVALID
return result
def _check_permissions(self, path: str) -> None:
"""Warn (don't block) if a plugin file is group/world-writable."""
try:
st = os.stat(path)
if st.st_mode & (stat.S_IWGRP | stat.S_IWOTH):
logger.warning(
"Plugin file '%s' is group/world-writable; this is a "
"security risk on shared systems.",
path,
)
except OSError:
pass
def attach(self, name: str, callback=None) -> None:
"""Dynamically load a plugin from its folder."""
with self._lock:
if name in self.plugins:
logger.info("Plugin '%s' is already attached.", name)
return
if not self._validate_name(name):
return
plugin_path = os.path.join(os.getcwd(), self.pluginspath, name)
info_path = os.path.join(plugin_path, name + ".json")
# Defense in depth: even with a validated name, make sure the
# resolved path is actually inside the plugins directory.
plugins_root = os.path.realpath(os.path.join(os.getcwd(), self.pluginspath))
if os.path.commonpath([plugins_root, os.path.realpath(plugin_path)]) != plugins_root:
logger.error("Refusing to attach '%s': resolves outside plugins directory.", name)
return
if not os.path.isdir(plugin_path) or not os.path.isfile(info_path):
logger.error("Failed to attach plugin '%s': folder or info file not found.", name)
return
self._check_permissions(info_path)
try:
with open(info_path, "r", encoding="utf-8") as f:
plugin_info = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to read info file for plugin '%s': %s", name, e)
return
module_name = plugin_info.get("module")
class_name = plugin_info.get("class")
if not module_name or not class_name:
logger.error("Plugin '%s' info file is missing 'module' or 'class' key.", name)
return
if not _MODULE_RE.match(module_name) or not _MODULE_RE.match(class_name):
logger.error("Plugin '%s' has an invalid module/class identifier.", name)
return
module_file = os.path.join(plugin_path, module_name + ".py")
if not os.path.isfile(module_file):
logger.error("Plugin '%s' module file '%s' not found.", name, module_file)
return
self._check_permissions(module_file)
# Verify the plugin's signature BEFORE we ever execute its code.
# Unsigned/tampered plugins still get a listing entry (built
# from the manifest alone, which is inert JSON) but their .py
# file is never imported, and they can never be run.
sig = self._verify_signature(name, plugin_path, module_name)
if sig["status"] != SIG_VERIFIED:
if sig["status"] == SIG_INVALID:
logger.error(
"Plugin '%s' failed signature verification (tampered or bad "
"signature); attaching as permanently disabled.", name,
)
else:
logger.warning(
"Plugin '%s' has no valid whitelist entry; attaching as "
"permanently disabled.", name,
)
self.plugins[name] = {
"instance": None,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": None,
"disabled": True,
"signature": sig,
}
return
# Load the module directly from its file path instead of
# mutating sys.path. This prevents a plugin from shadowing
# stdlib or third-party modules for the rest of the process.
qualified_name = f"xtendr_plugin_{name}_{module_name}"
try:
spec = importlib.util.spec_from_file_location(qualified_name, module_file)
if spec is None or spec.loader is None:
raise ImportError(f"Could not create import spec for '{module_file}'")
module = importlib.util.module_from_spec(spec)
sys.modules[qualified_name] = module
spec.loader.exec_module(module)
plugin_class = getattr(module, class_name)
instance = plugin_class()
if not isinstance(instance, XtendRBase):
print(f"Plugin '{name}' does not inherit from PluginBase.")
logger.error("Plugin '%s' does not inherit from XtendRBase.", name)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
'instance': instance,
'running': False,
'info': plugin_info,
'autorun': False
}
print(f"Attached plugin '{name}'.")
except (ModuleNotFoundError, json.JSONDecodeError, AttributeError) as e:
print(f"Failed to attach plugin '{name}': {e}")
except Exception as e: # noqa: BLE001 - plugin code is untrusted, isolate all failures
logger.error("Failed to attach plugin '%s': %s", name, e, exc_info=True)
sys.modules.pop(qualified_name, None)
return
self.plugins[name] = {
"instance": instance,
"running": False,
"info": plugin_info,
"autorun": False,
"module_key": qualified_name,
"disabled": False,
"signature": sig,
}
logger.info("Attached plugin '%s'.", name)
logger.info("Running pre-load on '%s'.", name)
def _pre_load_worker():
try:
instance.pre_load(callback)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during pre_load.", name, exc_info=True)
thread = threading.Thread(target=_pre_load_worker, daemon=True)
thread.start()
def run(self, name: str, *args, **kwargs):
"""Run the plugin's 'run' method if available."""
if name in self.plugins:
self.plugins[name]['running'] = True
return self.plugins[name]['instance'].run(*args, **kwargs)
print(f"Plugin '{name}' not found or has no 'run' method.")
with self._lock:
entry = self.plugins.get(name)
if entry is None:
logger.error("Plugin '%s' not found or has no 'run' method.", name)
return
if entry.get("disabled"):
logger.error("Plugin '%s' is disabled (failed signature verification) and cannot run.", name)
return
entry["running"] = True
try:
return entry["instance"].run(*args, **kwargs)
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during run.", name, exc_info=True)
entry["running"] = False
def stop(self, name: str) -> None:
"""Stop the plugin if it's running."""
if name in self.plugins and self.plugins[name]['running']:
self.plugins[name]['running'] = False
self.plugins[name]['instance'].stop()
else:
print(f"Plugin '{name}' is not running.")
with self._lock:
entry = self.plugins.get(name)
if entry is None or not entry["running"]:
logger.info("Plugin '%s' is not running.", name)
return
entry["running"] = False
try:
entry["instance"].stop()
except Exception: # noqa: BLE001
logger.error("Plugin '%s' raised during stop.", name, exc_info=True)
def detach(self, name: str) -> None:
"""Unload a plugin."""
if name in self.plugins:
del self.plugins[name]
sys.modules.pop(name, None)
print(f"Detached plugin '{name}'.")
else:
print(f"Plugin '{name}' is not attached.")
with self._lock:
entry = self.plugins.pop(name, None)
if entry is None:
logger.info("Plugin '%s' is not attached.", name)
return
if entry.get("module_key"):
sys.modules.pop(entry["module_key"], None)
logger.info("Detached plugin '%s'.", name)