Add files via upload
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
from .pyxtend_registry import PyXtendCore, IPyXtendRegistry
|
||||
from .pyxtend_core import PyXtendEngine
|
||||
from .pyxtend_models import Meta, Device, PluginConfig, DependencyModule, PyXtendRunTimeOption
|
||||
from .registry import PyXtendCore, IPyXtendRegistry
|
||||
from .core import PyXtendCore
|
||||
from .model import Meta, Device, PluginConfig, DependencyModule, PyXtendRunTimeOption
|
||||
from .interaction import PyXtendInteraction
|
||||
from .helpers import FileSystem, LogUtil
|
||||
|
||||
31
src/pyxtend/core.py
Normal file
31
src/pyxtend/core.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from logging import Logger
|
||||
|
||||
from .interaction import PyXtendInteraction
|
||||
from .helpers import LogUtil
|
||||
|
||||
|
||||
class PyXtendCore:
|
||||
_logger: Logger
|
||||
|
||||
def __init__(self, **args) -> None:
|
||||
self._logger = LogUtil.create(args['options']['log_level'])
|
||||
self.use_case = PyXtendInteraction(args['options'])
|
||||
|
||||
def start(self) -> None:
|
||||
self.__reload_plugins()
|
||||
self.__invoke_on_plugins('Q')
|
||||
|
||||
def __reload_plugins(self) -> None:
|
||||
"""Reset the list of all plugins and initiate the walk over the main
|
||||
provided plugin package to load all available plugins
|
||||
"""
|
||||
self.use_case.discover_plugins(True)
|
||||
|
||||
def __invoke_on_plugins(self, command: chr):
|
||||
"""Apply all of the plugins on the argument supplied to this function
|
||||
"""
|
||||
for module in self.use_case.modules:
|
||||
plugin = self.use_case.register_plugin(module, self._logger)
|
||||
delegate = self.use_case.hook_plugin(plugin)
|
||||
device = delegate(command=command)
|
||||
self._logger.info(f'Loaded device: {device}')
|
||||
62
src/pyxtend/helpers.py
Normal file
62
src/pyxtend/helpers.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from logging import Logger, StreamHandler, DEBUG
|
||||
from typing import Union, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
class FileSystem:
|
||||
|
||||
@staticmethod
|
||||
def __get_base_dir():
|
||||
"""At most all application packages are just one level deep"""
|
||||
current_path = os.path.abspath(os.path.dirname(__file__))
|
||||
return os.path.join(current_path, '..')
|
||||
|
||||
@staticmethod
|
||||
def __get_config_directory() -> str:
|
||||
base_dir = FileSystem.__get_base_dir()
|
||||
return os.path.join(base_dir, 'settings')
|
||||
|
||||
@staticmethod
|
||||
def get_plugins_directory() -> str:
|
||||
base_dir = FileSystem.__get_base_dir()
|
||||
return os.path.join(base_dir, 'plugins')
|
||||
|
||||
@staticmethod
|
||||
def load_configuration(name: str = 'configuration.yaml', config_directory: Optional[str] = None) -> dict:
|
||||
if config_directory is None:
|
||||
config_directory = FileSystem.__get_config_directory()
|
||||
with open(os.path.join(config_directory, name)) as file:
|
||||
input_data = yaml.safe_load(file)
|
||||
return input_data
|
||||
|
||||
|
||||
class LogUtil(Logger):
|
||||
__FORMATTER = "%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
log_format: str = __FORMATTER,
|
||||
level: Union[int, str] = DEBUG,
|
||||
*args,
|
||||
**kwargs
|
||||
) -> None:
|
||||
super().__init__(name, level)
|
||||
self.formatter = logging.Formatter(log_format)
|
||||
self.addHandler(self.__get_stream_handler())
|
||||
|
||||
def __get_stream_handler(self) -> StreamHandler:
|
||||
handler = StreamHandler(sys.stdout)
|
||||
handler.setFormatter(self.formatter)
|
||||
return handler
|
||||
|
||||
@staticmethod
|
||||
def create(log_level: str = 'DEBUG') -> Logger:
|
||||
logging.setLoggerClass(LogUtil)
|
||||
logger = logging.getLogger('plugin.architecture')
|
||||
logger.setLevel(log_level)
|
||||
return logger
|
||||
79
src/pyxtend/interaction.py
Normal file
79
src/pyxtend/interaction.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import os
|
||||
from importlib import import_module
|
||||
from logging import Logger
|
||||
from typing import List, Any, Dict
|
||||
|
||||
from .core import PyXtendCore
|
||||
from .registry import IPyXtendRegistry
|
||||
from .helpers import LogUtil
|
||||
from .utilities import PyXtendUtility
|
||||
|
||||
|
||||
class PyXtendInteraction:
|
||||
_logger: Logger
|
||||
modules: List[type]
|
||||
|
||||
def __init__(self, options: Dict) -> None:
|
||||
self._logger = LogUtil.create(options['log_level'])
|
||||
self.plugins_package: str = options['directory']
|
||||
self.plugin_util = PyXtendUtility(self._logger)
|
||||
self.modules = list()
|
||||
|
||||
def __check_loaded_plugin_state(self, plugin_module: Any):
|
||||
if len(IPyXtendRegistry.plugin_registries) > 0:
|
||||
latest_module = IPyXtendRegistry.plugin_registries[-1]
|
||||
latest_module_name = latest_module.__module__
|
||||
current_module_name = plugin_module.__name__
|
||||
if current_module_name == latest_module_name:
|
||||
self._logger.debug(f'Successfully imported module `{current_module_name}`')
|
||||
self.modules.append(latest_module)
|
||||
else:
|
||||
self._logger.error(
|
||||
f'Expected to import -> `{current_module_name}` but got -> `{latest_module_name}`'
|
||||
)
|
||||
# clear plugins from the registry when we're done with them
|
||||
IPyXtendRegistry.plugin_registries.clear()
|
||||
else:
|
||||
self._logger.error(f'No plugin found in registry for module: {plugin_module}')
|
||||
|
||||
def __search_for_plugins_in(self, plugins_path: List[str], package_name: str):
|
||||
for directory in plugins_path:
|
||||
entry_point = self.plugin_util.setup_plugin_configuration(package_name, directory)
|
||||
if entry_point is not None:
|
||||
plugin_name, plugin_ext = os.path.splitext(entry_point)
|
||||
# Importing the module will cause IPluginRegistry to invoke it's __init__ fun
|
||||
import_target_module = f'.{directory}.{plugin_name}'
|
||||
module = import_module(import_target_module, package_name)
|
||||
self.__check_loaded_plugin_state(module)
|
||||
else:
|
||||
self._logger.debug(f'No valid plugin found in {package_name}')
|
||||
|
||||
def discover_plugins(self, reload: bool):
|
||||
"""
|
||||
Discover the plugin classes contained in Python files, given a
|
||||
list of directory names to scan.
|
||||
"""
|
||||
if reload:
|
||||
self.modules.clear()
|
||||
IPyXtendRegistry.plugin_registries.clear()
|
||||
self._logger.debug(f'Searching for plugins under package {self.plugins_package}')
|
||||
plugins_path = PyXtendUtility.filter_plugins_paths(self.plugins_package)
|
||||
package_name = os.path.basename(os.path.normpath(self.plugins_package))
|
||||
self.__search_for_plugins_in(plugins_path, package_name)
|
||||
|
||||
@staticmethod
|
||||
def register_plugin(module: type, logger: Logger) -> PyXtendCore:
|
||||
"""
|
||||
Create a plugin instance from the given module
|
||||
:param module: module to initialize
|
||||
:param logger: logger for the module to use
|
||||
:return: a high level plugin
|
||||
"""
|
||||
return module(logger)
|
||||
|
||||
@staticmethod
|
||||
def hook_plugin(plugin: PyXtendCore):
|
||||
"""
|
||||
Return a function accepting commands.
|
||||
"""
|
||||
return plugin.invoke
|
||||
47
src/pyxtend/model.py
Normal file
47
src/pyxtend/model.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class PyXtendRunTimeOption(object):
|
||||
main: str
|
||||
tests: Optional[List[str]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyModule:
|
||||
name: str
|
||||
version: str
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'{self.name}=={self.version}'
|
||||
|
||||
|
||||
@dataclass
|
||||
class PluginConfig:
|
||||
name: str
|
||||
alias: str
|
||||
creator: str
|
||||
runtime: PyXtendRunTimeOption
|
||||
repository: str
|
||||
description: str
|
||||
version: str
|
||||
requirements: Optional[List[DependencyModule]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Meta:
|
||||
name: str
|
||||
description: str
|
||||
version: str
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'{self.name}: {self.version}'
|
||||
|
||||
|
||||
@dataclass
|
||||
class Device:
|
||||
name: str
|
||||
firmware: int
|
||||
protocol: str
|
||||
errors: List[int]
|
||||
36
src/pyxtend/registry.py
Normal file
36
src/pyxtend/registry.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from logging import Logger
|
||||
from typing import Optional, List
|
||||
|
||||
from .model import Meta, Device
|
||||
|
||||
|
||||
class IPyXtendRegistry(type):
|
||||
plugin_registries: List[type] = list()
|
||||
|
||||
def __init__(cls, name, bases, attrs):
|
||||
super().__init__(cls)
|
||||
if name != 'PluginCore':
|
||||
IPyXtendRegistry.plugin_registries.append(cls)
|
||||
|
||||
|
||||
class PyXtendCore(object, metaclass=IPyXtendRegistry):
|
||||
"""
|
||||
Plugin core class
|
||||
"""
|
||||
|
||||
meta: Optional[Meta]
|
||||
|
||||
def __init__(self, logger: Logger) -> None:
|
||||
"""
|
||||
Entry init block for plugins
|
||||
:param logger: logger that plugins can make use of
|
||||
"""
|
||||
self._logger = logger
|
||||
|
||||
def invoke(self, **args) -> Device:
|
||||
"""
|
||||
Starts main plugin flow
|
||||
:param args: possible arguments for the plugin
|
||||
:return: a device for the plugin
|
||||
"""
|
||||
pass
|
||||
104
src/pyxtend/utilities.py
Normal file
104
src/pyxtend/utilities.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from logging import Logger
|
||||
from subprocess import CalledProcessError
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
import pkg_resources
|
||||
from dacite import from_dict, ForwardReferenceError, UnexpectedDataError, WrongTypeError, MissingValueError
|
||||
from pkg_resources import Distribution
|
||||
|
||||
from .model import PluginConfig, DependencyModule
|
||||
from .helpers import FileSystem
|
||||
|
||||
|
||||
class PyXtendUtility:
|
||||
__IGNORE_LIST = ['__pycache__']
|
||||
|
||||
def __init__(self, logger: Logger) -> None:
|
||||
super().__init__()
|
||||
self._logger = logger
|
||||
|
||||
@staticmethod
|
||||
def __filter_unwanted_directories(name: str) -> bool:
|
||||
return not PyXtendUtility.__IGNORE_LIST.__contains__(name)
|
||||
|
||||
@staticmethod
|
||||
def filter_plugins_paths(plugins_package) -> List[str]:
|
||||
"""
|
||||
filters out a list of unwanted directories
|
||||
:param plugins_package:
|
||||
:return: list of directories
|
||||
"""
|
||||
return list(
|
||||
filter(
|
||||
PyXtendUtility.__filter_unwanted_directories,
|
||||
os.listdir(plugins_package)
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def __get_missing_packages(
|
||||
installed: List[Distribution],
|
||||
required: Optional[List[DependencyModule]]
|
||||
) -> List[DependencyModule]:
|
||||
missing = list()
|
||||
if required is not None:
|
||||
installed_packages: List[str] = [pkg.project_name for pkg in installed]
|
||||
for required_pkg in required:
|
||||
if not installed_packages.__contains__(required_pkg.name):
|
||||
missing.append(required_pkg)
|
||||
return missing
|
||||
|
||||
def __manage_requirements(self, package_name: str, plugin_config: PluginConfig):
|
||||
installed_packages: List[Distribution] = list(
|
||||
filter(lambda pkg: isinstance(pkg, Distribution), pkg_resources.working_set)
|
||||
)
|
||||
missing_packages = self.__get_missing_packages(installed_packages, plugin_config.requirements)
|
||||
for missing in missing_packages:
|
||||
self._logger.info(f'Preparing installation of module: {missing} for package: {package_name}')
|
||||
try:
|
||||
python = sys.executable
|
||||
exit_code = subprocess.check_call(
|
||||
[python, '-m', 'pip', 'install', missing.__str__()],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
self._logger.info(
|
||||
f'Installation of module: {missing} for package: {package_name} was returned exit code: {exit_code}'
|
||||
)
|
||||
except CalledProcessError as e:
|
||||
self._logger.error(f'Unable to install package {missing}', e)
|
||||
|
||||
def __read_configuration(self, module_path) -> Optional[PluginConfig]:
|
||||
try:
|
||||
plugin_config_data = FileSystem.load_configuration('plugin.yaml', module_path)
|
||||
plugin_config = from_dict(data_class=PluginConfig, data=plugin_config_data)
|
||||
return plugin_config
|
||||
except FileNotFoundError as e:
|
||||
self._logger.error('Unable to read configuration file', e)
|
||||
except (NameError, ForwardReferenceError, UnexpectedDataError, WrongTypeError, MissingValueError) as e:
|
||||
self._logger.error('Unable to parse plugin configuration to data class', e)
|
||||
return None
|
||||
|
||||
def setup_plugin_configuration(self, package_name, module_name) -> Optional[str]:
|
||||
"""
|
||||
Handles primary configuration for a give package and module
|
||||
:param package_name: package of the potential plugin
|
||||
:param module_name: module of the potential plugin
|
||||
:return: a module name to import
|
||||
"""
|
||||
# if the item has not folder we will assume that it is a directory
|
||||
module_path = os.path.join(FileSystem.get_plugins_directory(), module_name)
|
||||
if os.path.isdir(module_path):
|
||||
self._logger.debug(f'Checking if configuration file exists for module: {module_name}')
|
||||
plugin_config: Optional[PluginConfig] = self.__read_configuration(module_path)
|
||||
if plugin_config is not None:
|
||||
self.__manage_requirements(package_name, plugin_config)
|
||||
return plugin_config.runtime.main
|
||||
else:
|
||||
self._logger.debug(f'No configuration file exists for module: {module_name}')
|
||||
self._logger.debug(f'Module: {module_name} is not a directory, skipping scanning phase')
|
||||
return None
|
||||
Reference in New Issue
Block a user