diff --git a/setup.cfg b/setup.cfg index 417270a..3025d85 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,8 @@ project_urls = [options] packages = find: +install_requires = + typing-extensions python_requires = >=3.7 package_dir = =src diff --git a/src/superqt/qtcompat/QtCore.py b/src/superqt/qtcompat/QtCore.py index a755cc2..338dff0 100644 --- a/src/superqt/qtcompat/QtCore.py +++ b/src/superqt/qtcompat/QtCore.py @@ -15,6 +15,11 @@ from . import PYQT5, PYQT6, PYSIDE2, PYSIDE6, PythonQtError if PYQT5: from PyQt5.QtCore import QT_VERSION_STR as __version__ from PyQt5.QtCore import * + + try: + from PyQt5.QtCore import pyqtBoundSignal as SignalInstance # noqa + except ImportError: # 5.11 + SignalInstance = None from PyQt5.QtCore import pyqtProperty as Property # noqa from PyQt5.QtCore import pyqtSignal as Signal # noqa from PyQt5.QtCore import pyqtSlot as Slot # noqa @@ -24,6 +29,7 @@ if PYQT5: elif PYQT6: from PyQt6.QtCore import QT_VERSION_STR as __version__ from PyQt6.QtCore import * + from PyQt6.QtCore import pyqtBoundSignal as SignalInstance # noqa from PyQt6.QtCore import pyqtProperty as Property # noqa from PyQt6.QtCore import pyqtSignal as Signal # noqa from PyQt6.QtCore import pyqtSlot as Slot # noqa diff --git a/src/superqt/utils/__init__.py b/src/superqt/utils/__init__.py index 856f165..c2e89c5 100644 --- a/src/superqt/utils/__init__.py +++ b/src/superqt/utils/__init__.py @@ -1,4 +1,22 @@ -__all__ = ("QMessageHandler", "ensure_object_thread", "ensure_main_thread") +__all__ = ( + "create_worker", + "ensure_main_thread", + "ensure_object_thread", + "FunctionWorker", + "GeneratorWorker", + "new_worker_qthread", + "QMessageHandler", + "thread_worker", + "WorkerBase", +) from ._ensure_thread import ensure_main_thread, ensure_object_thread from ._message_handler import QMessageHandler +from ._qthreading import ( + FunctionWorker, + GeneratorWorker, + WorkerBase, + create_worker, + new_worker_qthread, + thread_worker, +) diff --git a/src/superqt/utils/_qthreading.py b/src/superqt/utils/_qthreading.py new file mode 100644 index 0000000..9ee89a4 --- /dev/null +++ b/src/superqt/utils/_qthreading.py @@ -0,0 +1,899 @@ +from __future__ import annotations + +import inspect +import time +import warnings +from functools import partial, wraps +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Generator, + Generic, + Optional, + Sequence, + Set, + Type, + TypeVar, + Union, + overload, +) + +from typing_extensions import Literal, ParamSpec + +from ..qtcompat.QtCore import QObject, QRunnable, QThread, QThreadPool, QTimer, Signal + +if TYPE_CHECKING: + + _T = TypeVar("_T") + + class SigInst(Generic[_T]): + @staticmethod + def connect(slot: Callable[[_T], Any], type: Optional[type] = ...) -> None: + ... + + @staticmethod + def disconnect(slot: Callable[[_T], Any] = ...) -> None: + ... + + @staticmethod + def emit(*args: _T) -> None: + ... + + +_Y = TypeVar("_Y") +_S = TypeVar("_S") +_R = TypeVar("_R") +_P = ParamSpec("_P") + + +def as_generator_function( + func: Callable[_P, _R] +) -> Callable[_P, Generator[None, None, _R]]: + """Turns a regular function (single return) into a generator function.""" + + @wraps(func) + def genwrapper(*args, **kwargs) -> Generator[None, None, _R]: + yield + return func(*args, **kwargs) + + return genwrapper + + +class WorkerBaseSignals(QObject): + + started = Signal() # emitted when the work is started + finished = Signal() # emitted when the work is finished + _finished = Signal(object) # emitted when the work is finished ro delete + returned = Signal(object) # emitted with return value + errored = Signal(object) # emitted with error object on Exception + warned = Signal(tuple) # emitted with showwarning args on warning + + +class WorkerBase(QRunnable, Generic[_R]): + """Base class for creating a Worker that can run in another thread. + + Parameters + ---------- + SignalsClass : type, optional + A QObject subclass that contains signals, by default WorkerBaseSignals + + Attributes + ---------- + signals: WorkerBaseSignals + signal emitter object. To allow identify which worker thread emitted signal. + """ + + #: A set of Workers. Add to set using :meth:`WorkerBase.start` + _worker_set: Set[WorkerBase] = set() + returned: SigInst[_R] + errored: SigInst[Exception] + warned: SigInst[tuple] + started: SigInst[None] + finished: SigInst[None] + + def __init__( + self, + func: Optional[Callable[_P, _R]] = None, + SignalsClass: Type[WorkerBaseSignals] = WorkerBaseSignals, + ) -> None: + super().__init__() + self._abort_requested = False + self._running = False + self.signals = SignalsClass() + + def __getattr__(self, name: str) -> SigInst: + """Pass through attr requests to signals to simplify connection API. + + The goal is to enable ``worker.yielded.connect`` instead of + ``worker.signals.yielded.connect``. Because multiple inheritance of Qt + classes is not well supported in PyQt, we have to use composition here + (signals are provided by QObjects, and QRunnable is not a QObject). So + this passthrough allows us to connect to signals on the ``_signals`` + object. + """ + # the Signal object is actually a class attribute + attr = getattr(self.signals.__class__, name, None) + if isinstance(attr, Signal): + # but what we need to connect to is the instantiated signal + # (which is of type `SignalInstance` in PySide and + # `pyqtBoundSignal` in PyQt) + return getattr(self.signals, name) + raise AttributeError( + f"{self.__class__.__name__!r} object has no attribute {name!r}" + ) + + def quit(self) -> None: + """Send a request to abort the worker. + + .. note:: + + It is entirely up to subclasses to honor this method by checking + ``self.abort_requested`` periodically in their ``worker.work`` + method, and exiting if ``True``. + """ + self._abort_requested = True + + @property + def abort_requested(self) -> bool: + """Whether the worker has been requested to stop.""" + return self._abort_requested + + @property + def is_running(self) -> bool: + """Whether the worker has been started""" + return self._running + + def run(self) -> None: + """Start the worker. + + The end-user should never need to call this function. + But it cannot be made private or renamed, since it is called by Qt. + + The order of method calls when starting a worker is: + + .. code-block:: none + + calls QThreadPool.globalInstance().start(worker) + | triggered by the QThreadPool.start() method + | | called by worker.run + | | | + V V V + worker.start -> worker.run -> worker.work + + **This** is the function that actually gets called when calling + :func:`QThreadPool.start(worker)`. It simply wraps the :meth:`work` + method, and emits a few signals. Subclasses should NOT override this + method (except with good reason), and instead should implement + :meth:`work`. + """ + self.started.emit() + self._running = True + try: + with warnings.catch_warnings(): + warnings.filterwarnings("always") + warnings.showwarning = lambda *w: self.warned.emit(w) + result = self.work() + if isinstance(result, Exception): + if isinstance(result, RuntimeError): + # The Worker object has likely been deleted. + # A deleted wrapped C/C++ object may result in a runtime + # error that will cause segfault if we try to do much other + # than simply notify the user. + warnings.warn( + f"RuntimeError in aborted thread: {result}", + RuntimeWarning, + ) + return + else: + raise result + if not self.abort_requested: + self.returned.emit(result) + except Exception as exc: + self.errored.emit(exc) + self._running = False + self.finished.emit() + self._finished.emit(self) + + def work(self) -> Union[Exception, _R]: + """Main method to execute the worker. + + The end-user should never need to call this function. + But subclasses must implement this method (See + :meth:`GeneratorFunction.work` for an example implementation). + Minimally, it should check ``self.abort_requested`` periodically and + exit if True. + + Examples + -------- + .. code-block:: python + + class MyWorker(WorkerBase): + + def work(self): + i = 0 + while True: + if self.abort_requested: + self.aborted.emit() + break + i += 1 + if i > max_iters: + break + time.sleep(0.5) + """ + raise NotImplementedError( + f'"{self.__class__.__name__}" failed to define work() method' + ) + + def start(self) -> None: + """Start this worker in a thread and add it to the global threadpool. + + The order of method calls when starting a worker is: + + .. code-block:: none + + calls QThreadPool.globalInstance().start(worker) + | triggered by the QThreadPool.start() method + | | called by worker.run + | | | + V V V + worker.start -> worker.run -> worker.work + """ + if self in self._worker_set: + raise RuntimeError("This worker is already started!") + + # This will raise a RunTimeError if the worker is already deleted + repr(self) + + self._worker_set.add(self) + self._finished.connect(self._set_discard) + start_ = partial(QThreadPool.globalInstance().start, self) + QTimer.singleShot(10, start_) + + @classmethod + def _set_discard(cls, obj: WorkerBase) -> None: + cls._worker_set.discard(obj) + + @classmethod + def await_workers(cls, msecs: int = None) -> None: + """Ask all workers to quit, and wait up to `msec` for quit. + + Attempts to clean up all running workers by calling ``worker.quit()`` + method. Any workers in the ``WorkerBase._worker_set`` set will have this + method. + + By default, this function will block indefinitely, until worker threads + finish. If a timeout is provided, a ``RuntimeError`` will be raised if + the workers do not gracefully exit in the time requests, but the threads + will NOT be killed. It is (currently) left to the user to use their OS + to force-quit rogue threads. + + .. important:: + + If the user does not put any yields in their function, and the function + is super long, it will just hang... For instance, there's no graceful + way to kill this thread in python: + + .. code-block:: python + + @thread_worker + def ZZZzzz(): + time.sleep(10000000) + + This is why it's always advisable to use a generator that periodically + yields for long-running computations in another thread. + + See `this stack-overflow post + `_ + for a good discussion on the difficulty of killing a rogue python thread: + + Parameters + ---------- + msecs : int, optional + Waits up to msecs milliseconds for all threads to exit and removes all + threads from the thread pool. If msecs is `None` (the default), the + timeout is ignored (waits for the last thread to exit). + + Raises + ------ + RuntimeError + If a timeout is provided and workers do not quit successfully within + the time allotted. + """ + for worker in cls._worker_set: + worker.quit() + + msecs = msecs if msecs is not None else -1 + if not QThreadPool.globalInstance().waitForDone(msecs): + raise RuntimeError( + f"Workers did not quit gracefully in the time allotted ({msecs} ms)" + ) + + +class FunctionWorker(WorkerBase[_R]): + """QRunnable with signals that wraps a simple long-running function. + + .. note:: + + ``FunctionWorker`` does not provide a way to stop a very long-running + function (e.g. ``time.sleep(10000)``). So whenever possible, it is + better to implement your long running function as a generator that + yields periodically, and use the :class:`GeneratorWorker` instead. + + Parameters + ---------- + func : Callable + A function to call in another thread + *args + will be passed to the function + **kwargs + will be passed to the function + + Raises + ------ + TypeError + If ``func`` is a generator function and not a regular function. + """ + + def __init__(self, func: Callable[_P, _R], *args, **kwargs): + if inspect.isgeneratorfunction(func): + raise TypeError( + f"Generator function {func} cannot be used with FunctionWorker, " + "use GeneratorWorker instead", + ) + super().__init__() + + self._func = func + self._args = args + self._kwargs = kwargs + + def work(self) -> _R: + return self._func(*self._args, **self._kwargs) + + +class GeneratorWorkerSignals(WorkerBaseSignals): + + yielded = Signal(object) # emitted with yielded values (if generator used) + paused = Signal() # emitted when a running job has successfully paused + resumed = Signal() # emitted when a paused job has successfully resumed + aborted = Signal() # emitted when a running job is successfully aborted + + +class GeneratorWorker(WorkerBase, Generic[_Y, _S, _R]): + """QRunnable with signals that wraps a long-running generator. + + Provides a convenient way to run a generator function in another thread, + while allowing 2-way communication between threads, using plain-python + generator syntax in the original function. + + Parameters + ---------- + func : callable + The function being run in another thread. May be a generator function. + SignalsClass : type, optional + A QObject subclass that contains signals, by default + GeneratorWorkerSignals + *args + Will be passed to func on instantiation + **kwargs + Will be passed to func on instantiation + """ + + yielded: SigInst[_Y] + paused: SigInst[None] + resumed: SigInst[None] + aborted: SigInst[None] + + def __init__( + self, + func: Callable[_P, Generator[_Y, Optional[_S], _R]], + *args, + SignalsClass: Type[WorkerBaseSignals] = GeneratorWorkerSignals, + **kwargs, + ): + if not inspect.isgeneratorfunction(func): + raise TypeError( + f"Regular function {func} cannot be used with GeneratorWorker, " + "use FunctionWorker instead", + ) + super().__init__(SignalsClass=SignalsClass) + + self._gen = func(*args, **kwargs) + self._incoming_value: Optional[_S] = None + self._pause_requested = False + self._resume_requested = False + self._paused = False + + # polling interval: ONLY relevant if the user paused a running worker + self._pause_interval = 0.01 + self.pbar = None + + def work(self) -> Union[Optional[_R], Exception]: + """Core event loop that calls the original function. + + Enters a continual loop, yielding and returning from the original + function. Checks for various events (quit, pause, resume, etc...). + (To clarify: we are creating a rudimentary event loop here because + there IS NO Qt event loop running in the other thread to hook into) + """ + while True: + if self.abort_requested: + self.aborted.emit() + break + if self._paused: + if self._resume_requested: + self._paused = False + self._resume_requested = False + self.resumed.emit() + else: + time.sleep(self._pause_interval) + continue + elif self._pause_requested: + self._paused = True + self._pause_requested = False + self.paused.emit() + continue + try: + input = self._next_value() + output = self._gen.send(input) + self.yielded.emit(output) + except StopIteration as exc: + return exc.value + except RuntimeError as exc: + # The worker has probably been deleted. warning will be + # emitted in ``WorkerBase.run`` + return exc + return None + + def send(self, value: _S): + """Send a value into the function (if a generator was used).""" + self._incoming_value = value + + def _next_value(self) -> Optional[_S]: + out = None + if self._incoming_value is not None: + out = self._incoming_value + self._incoming_value = None + return out + + @property + def is_paused(self) -> bool: + """Whether the worker is currently paused.""" + return self._paused + + def toggle_pause(self) -> None: + """Request to pause the worker if playing or resume if paused.""" + if self.is_paused: + self._resume_requested = True + else: + self._pause_requested = True + + def pause(self) -> None: + """Request to pause the worker.""" + if not self.is_paused: + self._pause_requested = True + + def resume(self) -> None: + """Send a request to resume the worker.""" + if self.is_paused: + self._resume_requested = True + + +############################################################################# + +# convenience functions for creating Worker instances + + +@overload +def create_worker( + func: Callable[_P, Generator[_Y, _S, _R]], + *args, + _start_thread: Optional[bool] = None, + _connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + _worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None, + _ignore_errors: bool = False, + **kwargs, +) -> GeneratorWorker[_Y, _S, _R]: + ... + + +@overload +def create_worker( + func: Callable[_P, _R], + *args, + _start_thread: Optional[bool] = None, + _connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + _worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None, + _ignore_errors: bool = False, + **kwargs, +) -> FunctionWorker[_R]: + ... + + +def create_worker( + func: Callable, + *args, + _start_thread: Optional[bool] = None, + _connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + _worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None, + _ignore_errors: bool = False, + **kwargs, +) -> Union[FunctionWorker, GeneratorWorker]: + """Convenience function to start a function in another thread. + + By default, uses :class:`Worker`, but a custom ``WorkerBase`` subclass may + be provided. If so, it must be a subclass of :class:`Worker`, which + defines a standard set of signals and a run method. + + Parameters + ---------- + func : Callable + The function to call in another thread. + _start_thread : bool, optional + Whether to immediaetly start the thread. If False, the returned worker + must be manually started with ``worker.start()``. by default it will be + ``False`` if the ``_connect`` argument is ``None``, otherwise ``True``. + _connect : Dict[str, Union[Callable, Sequence]], optional + A mapping of ``"signal_name"`` -> ``callable`` or list of ``callable``: + callback functions to connect to the various signals offered by the + worker class. by default None + _worker_class : type of GeneratorWorker or FunctionWorker, optional + The :class`WorkerBase` to instantiate, by default + :class:`FunctionWorker` will be used if ``func`` is a regular function, + and :class:`GeneratorWorker` will be used if it is a generator. + _ignore_errors : bool, optional + If ``False`` (the default), errors raised in the other thread will be + reraised in the main thread (makes debugging significantly easier). + *args + will be passed to ``func`` + **kwargs + will be passed to ``func`` + + Returns + ------- + worker : WorkerBase + An instantiated worker. If ``_start_thread`` was ``False``, the worker + will have a `.start()` method that can be used to start the thread. + + Raises + ------ + TypeError + If a worker_class is provided that is not a subclass of WorkerBase. + TypeError + If _connect is provided and is not a dict of ``{str: callable}`` + + Examples + -------- + .. code-block:: python + + def long_function(duration): + import time + time.sleep(duration) + + worker = create_worker(long_function, 10) + + """ + worker: Union[FunctionWorker, GeneratorWorker] + + if not _worker_class: + if inspect.isgeneratorfunction(func): + _worker_class = GeneratorWorker + else: + _worker_class = FunctionWorker + + if not inspect.isclass(_worker_class) and issubclass(_worker_class, WorkerBase): + raise TypeError(f"Worker {_worker_class} must be a subclass of WorkerBase") + + worker = _worker_class(func, *args, **kwargs) + + if _connect is not None: + if not isinstance(_connect, dict): + raise TypeError("The '_connect' argument must be a dict") + + if _start_thread is None: + _start_thread = True + + for key, val in _connect.items(): + _val = val if isinstance(val, (tuple, list)) else [val] + for v in _val: + if not callable(v): + raise TypeError( + f"_connect[{key!r}] must be a function or sequence of functions" + ) + getattr(worker, key).connect(v) + + # if the user has not provided a default connection for the "errored" + # signal... and they have not explicitly set ``ignore_errors=True`` + # Then rereaise any errors from the thread. + if not _ignore_errors and not (_connect or {}).get("errored", False): + + def reraise(e): + raise e + + worker.errored.connect(reraise) + + if _start_thread: + worker.start() + return worker + + +@overload +def thread_worker( + function: Callable[_P, Generator[_Y, _S, _R]], + start_thread: Optional[bool] = None, + connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + worker_class: Optional[Type[WorkerBase]] = None, + ignore_errors: bool = False, +) -> Callable[_P, GeneratorWorker[_Y, _S, _R]]: + ... + + +@overload +def thread_worker( + function: Callable[_P, _R], + start_thread: Optional[bool] = None, + connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + worker_class: Optional[Type[WorkerBase]] = None, + ignore_errors: bool = False, +) -> Callable[_P, FunctionWorker[_R]]: + ... + + +@overload +def thread_worker( + function: Literal[None] = None, + start_thread: Optional[bool] = None, + connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + worker_class: Optional[Type[WorkerBase]] = None, + ignore_errors: bool = False, +) -> Callable[[Callable], Callable[_P, Union[FunctionWorker, GeneratorWorker]]]: + ... + + +def thread_worker( + function: Optional[Callable] = None, + start_thread: Optional[bool] = None, + connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None, + worker_class: Optional[Type[WorkerBase]] = None, + ignore_errors: bool = False, +): + """Decorator that runs a function in a separate thread when called. + + When called, the decorated function returns a :class:`WorkerBase`. See + :func:`create_worker` for additional keyword arguments that can be used + when calling the function. + + The returned worker will have these signals: + + - *started*: emitted when the work is started + - *finished*: emitted when the work is finished + - *returned*: emitted with return value + - *errored*: emitted with error object on Exception + + It will also have a ``worker.start()`` method that can be used to start + execution of the function in another thread. (useful if you need to connect + callbacks to signals prior to execution) + + If the decorated function is a generator, the returned worker will also + provide these signals: + + - *yielded*: emitted with yielded values + - *paused*: emitted when a running job has successfully paused + - *resumed*: emitted when a paused job has successfully resumed + - *aborted*: emitted when a running job is successfully aborted + + And these methods: + + - *quit*: ask the thread to quit + - *toggle_paused*: toggle the running state of the thread. + - *send*: send a value into the generator. (This requires that your + decorator function uses the ``value = yield`` syntax) + + Parameters + ---------- + function : callable + Function to call in another thread. For communication between threads + may be a generator function. + start_thread : bool, optional + Whether to immediaetly start the thread. If False, the returned worker + must be manually started with ``worker.start()``. by default it will be + ``False`` if the ``_connect`` argument is ``None``, otherwise ``True``. + connect : Dict[str, Union[Callable, Sequence]], optional + A mapping of ``"signal_name"`` -> ``callable`` or list of ``callable``: + callback functions to connect to the various signals offered by the + worker class. by default None + worker_class : Type[WorkerBase], optional + The :class`WorkerBase` to instantiate, by default + :class:`FunctionWorker` will be used if ``func`` is a regular function, + and :class:`GeneratorWorker` will be used if it is a generator. + ignore_errors : bool, optional + If ``False`` (the default), errors raised in the other thread will be + reraised in the main thread (makes debugging significantly easier). + + Returns + ------- + callable + function that creates a worker, puts it in a new thread and returns + the worker instance. + + Examples + -------- + .. code-block:: python + + @thread_worker + def long_function(start, end): + # do work, periodically yielding + i = start + while i <= end: + time.sleep(0.1) + yield i + + # do teardown + return 'anything' + + # call the function to start running in another thread. + worker = long_function() + # connect signals here if desired... or they may be added using the + # `connect` argument in the `@thread_worker` decorator... in which + # case the worker will start immediately when long_function() is called + worker.start() + """ + + def _inner(func): + @wraps(func) + def worker_function(*args, **kwargs): + # decorator kwargs can be overridden at call time by using the + # underscore-prefixed version of the kwarg. + kwargs["_start_thread"] = kwargs.get("_start_thread", start_thread) + kwargs["_connect"] = kwargs.get("_connect", connect) + kwargs["_worker_class"] = kwargs.get("_worker_class", worker_class) + kwargs["_ignore_errors"] = kwargs.get("_ignore_errors", ignore_errors) + return create_worker( + func, + *args, + **kwargs, + ) + + return worker_function + + return _inner if function is None else _inner(function) + + +############################################################################ + +# This is a variant on the above pattern, it uses QThread instead of Qrunnable +# see https://doc.qt.io/qt-5/threads-technologies.html#comparison-of-solutions +# (it appears from that table that QRunnable cannot emit or receive signals, +# but we circumvent that here with our WorkerBase class that also inherits from +# QObject... providing signals/slots). +# +# A benefit of the QRunnable pattern is that Qt manages the threads for you, +# in the QThreadPool.globalInstance() ... making it easier to reuse threads, +# and reduce overhead. +# +# However, a disadvantage is that you have no access to (and therefore less +# control over) the QThread itself. See for example all of the methods +# provided on the QThread object: https://doc.qt.io/qt-5/qthread.html + +if TYPE_CHECKING: + + class WorkerProtocol(QObject): + finished: Signal + + def work(self) -> None: + ... + + +def new_worker_qthread( + Worker: Type[WorkerProtocol], + *args, + _start_thread: bool = False, + _connect: Dict[str, Callable] = None, + **kwargs, +): + """This is a convenience function to start a worker in a Qthread. + + In most cases, the @thread_worker decorator is sufficient and preferable. + But this allows the user to completely customize the Worker object. + However, they must then maintain control over the thread and clean up + appropriately. + + It follows the pattern described here: + https://www.qt.io/blog/2010/06/17/youre-doing-it-wrong + and + https://doc.qt.io/qt-5/qthread.html#details + + see also: + https://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/ + + A QThread object is not a thread! It should be thought of as a class to + *manage* a thread, not as the actual code or object that runs in that + thread. The QThread object is created on the main thread and lives there. + + Worker objects which derive from QObject are the things that actually do + the work. They can be moved to a QThread as is done here. + + .. note:: Mostly ignorable detail + + While the signals/slots syntax of the worker looks very similar to + standard "single-threaded" signals & slots, note that inter-thread + signals and slots (automatically) use an event-based QueuedConnection, + while intra-thread signals use a DirectConnection. See `Signals and + Slots Across Threads + `_ + + Parameters + ---------- + Worker : QObject + QObject type that implements a `work()` method. The Worker should also + emit a finished signal when the work is done. + _start_thread : bool + If True, thread will be started immediately, otherwise, thread must + be manually started with thread.start(). + _connect : dict, optional + Optional dictionary of {signal: function} to connect to the new worker. + for instance: _connect = {'incremented': myfunc} will result in: + worker.incremented.connect(myfunc) + *args + will be passed to the Worker class on instantiation. + **kwargs + will be passed to the Worker class on instantiation. + + Returns + ------- + worker : WorkerBase + The created worker. + thread : QThread + The thread on which the worker is running. + + Examples + -------- + Create some QObject that has a long-running work method: + + .. code-block:: python + + class Worker(QObject): + + finished = Signal() + increment = Signal(int) + + def __init__(self, argument): + super().__init__() + self.argument = argument + + @Slot() + def work(self): + # some long running task... + import time + for i in range(10): + time.sleep(1) + self.increment.emit(i) + self.finished.emit() + + worker, thread = new_worker_qthread( + Worker, + 'argument', + _start_thread=True, + _connect={'increment': print}, + ) + + """ + + if _connect and not isinstance(_connect, dict): + raise TypeError("_connect parameter must be a dict") + + thread = QThread() + worker = Worker(*args, **kwargs) + worker.moveToThread(thread) + thread.started.connect(worker.work) + worker.finished.connect(thread.quit) + worker.finished.connect(worker.deleteLater) + thread.finished.connect(thread.deleteLater) + + if _connect: + [getattr(worker, key).connect(val) for key, val in _connect.items()] + + if _start_thread: + thread.start() # sometimes need to connect stuff before starting + return worker, thread diff --git a/tests/test_ensure_thread.py b/tests/test_ensure_thread.py index b60f5d8..7d666e8 100644 --- a/tests/test_ensure_thread.py +++ b/tests/test_ensure_thread.py @@ -1,4 +1,5 @@ import inspect +import os import time from concurrent.futures import Future, TimeoutError @@ -7,6 +8,8 @@ import pytest from superqt.qtcompat.QtCore import QCoreApplication, QObject, QThread, Signal from superqt.utils import ensure_main_thread, ensure_object_thread +skip_on_ci = pytest.mark.skipif(bool(os.getenv("CI")), reason="github hangs") + class SampleObject(QObject): assigment_done = Signal() @@ -122,25 +125,8 @@ def test_only_main_thread(qapp): assert ob.sample_object_thread_property == 7 -def test_object_thread(qtbot): - ob = SampleObject() - thread = QThread() - thread.start() - ob.moveToThread(thread) - with qtbot.waitSignal(ob.assigment_done): - ob.check_object_thread(2, b=4) - assert ob.object_thread_res == {"a": 2, "b": 4} - - with qtbot.waitSignal(ob.assigment_done): - ob.sample_object_thread_property = "text" - - assert ob.sample_object_thread_property == "text" - assert ob.thread() is thread - with qtbot.waitSignal(thread.finished): - thread.exit(0) - - def test_main_thread(qtbot): + print("test_main_thread start") ob = SampleObject() t = LocalThread(ob) with qtbot.waitSignal(t.finished): @@ -148,40 +134,7 @@ def test_main_thread(qtbot): assert ob.main_thread_res == {"a": 5, "b": 8} assert ob.sample_main_thread_property == "text2" - - -def test_object_thread_return(qtbot): - ob = SampleObject() - thread = QThread() - thread.start() - ob.moveToThread(thread) - assert ob.check_object_thread_return(2) == 14 - assert ob.thread() is thread - with qtbot.waitSignal(thread.finished): - thread.exit(0) - - -def test_object_thread_return_timeout(qtbot): - ob = SampleObject() - thread = QThread() - thread.start() - ob.moveToThread(thread) - with pytest.raises(TimeoutError): - ob.check_object_thread_return_timeout(2) - with qtbot.waitSignal(thread.finished): - thread.exit(0) - - -def test_object_thread_return_future(qtbot): - ob = SampleObject() - thread = QThread() - thread.start() - ob.moveToThread(thread) - future = ob.check_object_thread_return_future(2) - assert isinstance(future, Future) - assert future.result() == 14 - with qtbot.waitSignal(thread.finished): - thread.exit(0) + print("test_main_thread done") def test_main_thread_return(qtbot): @@ -210,3 +163,59 @@ def test_names(qapp): assert list(signature.parameters.values())[0].name == "a" assert list(signature.parameters.values())[0].annotation == int assert ob.check_main_thread_return.__name__ == "check_main_thread_return" + + +# @skip_on_ci +def test_object_thread_return(qtbot): + ob = SampleObject() + thread = QThread() + thread.start() + ob.moveToThread(thread) + assert ob.check_object_thread_return(2) == 14 + assert ob.thread() is thread + with qtbot.waitSignal(thread.finished): + thread.quit() + + +# @skip_on_ci +def test_object_thread_return_timeout(qtbot): + ob = SampleObject() + thread = QThread() + thread.start() + ob.moveToThread(thread) + with pytest.raises(TimeoutError): + ob.check_object_thread_return_timeout(2) + with qtbot.waitSignal(thread.finished): + thread.quit() + + +@skip_on_ci +def test_object_thread_return_future(qtbot): + ob = SampleObject() + thread = QThread() + thread.start() + ob.moveToThread(thread) + future = ob.check_object_thread_return_future(2) + assert isinstance(future, Future) + assert future.result() == 14 + with qtbot.waitSignal(thread.finished): + thread.quit() + + +@skip_on_ci +def test_object_thread(qtbot): + ob = SampleObject() + thread = QThread() + thread.start() + ob.moveToThread(thread) + with qtbot.waitSignal(ob.assigment_done): + ob.check_object_thread(2, b=4) + assert ob.object_thread_res == {"a": 2, "b": 4} + + with qtbot.waitSignal(ob.assigment_done): + ob.sample_object_thread_property = "text" + + assert ob.sample_object_thread_property == "text" + assert ob.thread() is thread + with qtbot.waitSignal(thread.finished): + thread.quit() diff --git a/tests/test_threadworker.py b/tests/test_threadworker.py new file mode 100644 index 0000000..9ee8c40 --- /dev/null +++ b/tests/test_threadworker.py @@ -0,0 +1,276 @@ +import inspect +import time +import warnings +from functools import partial +from operator import eq + +import pytest + +import superqt.utils._qthreading as qthreading + +equals_1 = partial(eq, 1) +equals_3 = partial(eq, 3) +skip = pytest.mark.skipif(True, reason="testing") + + +def test_as_generator_function(): + """Test we can convert a regular function to a generator function.""" + + def func(): + return + + assert not inspect.isgeneratorfunction(func) + + newfunc = qthreading.as_generator_function(func) + assert inspect.isgeneratorfunction(newfunc) + assert list(newfunc()) == [None] + + +# qtbot is necessary for qthreading here. +# note: pytest-cov cannot check coverage of code run in the other thread. +def test_thread_worker(qtbot): + """Test basic threadworker on a function""" + + @qthreading.thread_worker + def func(): + return 1 + + wrkr = func() + assert isinstance(wrkr, qthreading.FunctionWorker) + + signals = [wrkr.returned, wrkr.finished] + checks = [equals_1, lambda: True] + with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"): + wrkr.start() + + +def test_thread_generator_worker(qtbot): + """Test basic threadworker on a generator""" + + @qthreading.thread_worker + def func(): + yield 1 + yield 1 + return 3 + + wrkr = func() + assert isinstance(wrkr, qthreading.GeneratorWorker) + + signals = [wrkr.yielded, wrkr.yielded, wrkr.returned, wrkr.finished] + checks = [equals_1, equals_1, equals_3, lambda: True] + with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"): + wrkr.start() + + qtbot.wait(500) + + +def test_thread_raises2(qtbot): + handle_val = [0] + + def handle_raise(e): + handle_val[0] = 1 + assert isinstance(e, ValueError) + assert str(e) == "whoops" + + @qthreading.thread_worker(connect={"errored": handle_raise}, start_thread=False) + def func(): + yield 1 + yield 1 + raise ValueError("whoops") + + wrkr = func() + assert isinstance(wrkr, qthreading.GeneratorWorker) + + signals = [wrkr.yielded, wrkr.yielded, wrkr.errored, wrkr.finished] + checks = [equals_1, equals_1, None, None] + with qtbot.waitSignals(signals, check_params_cbs=checks): + wrkr.start() + assert handle_val[0] == 1 + + +def test_thread_warns(qtbot): + """Test warnings get returned to main thread""" + + def check_warning(w): + return str(w) == "hey!" + + @qthreading.thread_worker(connect={"warned": check_warning}, start_thread=False) + def func(): + yield 1 + warnings.warn("hey!") + yield 3 + warnings.warn("hey!") + return 1 + + wrkr = func() + assert isinstance(wrkr, qthreading.GeneratorWorker) + + signals = [wrkr.yielded, wrkr.warned, wrkr.yielded, wrkr.returned] + checks = [equals_1, None, equals_3, equals_1] + with qtbot.waitSignals(signals, check_params_cbs=checks): + wrkr.start() + + +def test_multiple_connections(qtbot): + """Test the connect dict accepts a list of functions, and type checks""" + + test1_val = [0] + test2_val = [0] + + def func(): + return 1 + + def test1(v): + test1_val[0] = 1 + assert v == 1 + + def test2(v): + test2_val[0] = 1 + assert v == 1 + + thread_func = qthreading.thread_worker( + func, connect={"returned": [test1, test2]}, start_thread=False + ) + worker = thread_func() + assert isinstance(worker, qthreading.FunctionWorker) + with qtbot.waitSignal(worker.finished): + worker.start() + + assert test1_val[0] == 1 + assert test2_val[0] == 1 + + # they must all be functions + with pytest.raises(TypeError): + qthreading.thread_worker(func, connect={"returned": ["test1", test2]})() + + # they must all be functions + with pytest.raises(TypeError): + qthreading.thread_worker(func, connect=test1)() + + +def test_create_worker(qapp): + """Test directly calling create_worker.""" + + def func(x, y): + return x + y + + worker = qthreading.create_worker(func, 1, 2) + assert isinstance(worker, qthreading.WorkerBase) + + with pytest.raises(TypeError): + _ = qthreading.create_worker(func, 1, 2, _worker_class=object) + + +# note: pytest-cov cannot check coverage of code run in the other thread. +# this is just for the sake of coverage +def test_thread_worker_in_main_thread(qapp): + """Test basic threadworker on a function""" + + def func(x): + return x + + thread_func = qthreading.thread_worker(func) + worker = thread_func(2) + # NOTE: you shouldn't normally call worker.work()! If you do, it will NOT + # be run in a separate thread (as it would for worker.start(). + # This is for the sake of testing it in the main thread. + assert worker.work() == 2 + + +# note: pytest-cov cannot check coverage of code run in the other thread. +# this is just for the sake of coverage +def test_thread_generator_worker_in_main_thread(qapp): + """Test basic threadworker on a generator in the main thread with methods.""" + + def func(): + i = 0 + while i < 10: + i += 1 + incoming = yield i + i = incoming if incoming is not None else i + return 3 + + worker = qthreading.thread_worker(func, start_thread=False)() + counter = 0 + + def handle_pause(): + time.sleep(0.1) + assert worker.is_paused + worker.toggle_pause() + + def test_yield(v): + nonlocal counter + counter += 1 + if v == 2: + assert not worker.is_paused + worker.pause() + assert not worker.is_paused + if v == 3: + worker.send(7) + if v == 9: + worker.quit() + + def handle_abort(): + assert counter == 5 # because we skipped a few by sending in 7 + + worker.paused.connect(handle_pause) + assert isinstance(worker, qthreading.GeneratorWorker) + worker.yielded.connect(test_yield) + worker.aborted.connect(handle_abort) + # NOTE: you shouldn't normally call worker.work()! If you do, it will NOT + # be run in a separate thread (as it would for worker.start(). + # This is for the sake of testing it in the main thread. + assert worker.work() is None # because we aborted it + assert not worker.is_paused + assert counter == 5 + + worker2 = qthreading.thread_worker(func, start_thread=False)() + assert worker2.work() == 3 + + +def test_worker_base_attribute(qapp): + obj = qthreading.WorkerBase() + assert obj.started is not None + assert obj.finished is not None + assert obj.returned is not None + assert obj.errored is not None + with pytest.raises(AttributeError): + obj.aa + + +def test_abort_does_not_return(qtbot): + loop_counter = 0 + + def long_running_func(): + nonlocal loop_counter + + for _ in range(5): + yield loop_counter + time.sleep(0.1) + loop_counter += 1 + + abort_counter = 0 + + def count_abort(): + nonlocal abort_counter + abort_counter += 1 + + return_counter = 0 + + def returned_handler(value): + nonlocal return_counter + return_counter += 1 + + threaded_function = qthreading.thread_worker( + long_running_func, + connect={ + "returned": returned_handler, + "aborted": count_abort, + }, + ) + worker = threaded_function() + worker.quit() + qtbot.wait(600) + assert loop_counter < 4 + assert abort_counter == 1 + assert return_counter == 0 diff --git a/tox.ini b/tox.ini index c165fd6..f4dd9e2 100644 --- a/tox.ini +++ b/tox.ini @@ -65,4 +65,4 @@ extras = pyside6: pyside6 commands_pre = pyqt6,pyside6: pip install -U pytest-qt@git+https://github.com/pytest-dev/pytest-qt.git -commands = pytest --color=yes --cov=superqt --cov-report=xml {posargs} +commands = pytest --color=yes --cov=superqt --cov-report=xml -v {posargs}