Threadworker (#31)

* add threadworker and tests

* add type

* update typing

* keep runtime types

* update

* remove slot

* remove order

* remove signalinstance hint

* fix old import error

* remove unneeded order

* try something

* comment

* timeout

* add qapp to everything

* verbose

* also add -s

* print lots

* move to bottom

* use sigint after time

* use wraper for future object

* remove temporary stuff

* undo move

* move again

* delete reference after return result

* add back sigint after time

* add print

* change scope

* add more prints

* change f string

* timtout

* no sigint again

* print more

* bump

* try without object thread tests

* just skip

* modify skips

* undo ensure thread changes

* verbose

Co-authored-by: Grzegorz Bokota <bokota+github@gmail.com>
This commit is contained in:
Talley Lambert
2021-10-15 12:05:44 -04:00
committed by GitHub
parent 67035a0f0b
commit 5983bd1552
7 changed files with 1264 additions and 54 deletions

View File

@@ -33,6 +33,8 @@ project_urls =
[options] [options]
packages = find: packages = find:
install_requires =
typing-extensions
python_requires = >=3.7 python_requires = >=3.7
package_dir = package_dir =
=src =src

View File

@@ -15,6 +15,11 @@ from . import PYQT5, PYQT6, PYSIDE2, PYSIDE6, PythonQtError
if PYQT5: if PYQT5:
from PyQt5.QtCore import QT_VERSION_STR as __version__ from PyQt5.QtCore import QT_VERSION_STR as __version__
from PyQt5.QtCore import * from PyQt5.QtCore import *
try:
from PyQt5.QtCore import pyqtBoundSignal as SignalInstance # noqa
except ImportError: # 5.11
SignalInstance = None
from PyQt5.QtCore import pyqtProperty as Property # noqa from PyQt5.QtCore import pyqtProperty as Property # noqa
from PyQt5.QtCore import pyqtSignal as Signal # noqa from PyQt5.QtCore import pyqtSignal as Signal # noqa
from PyQt5.QtCore import pyqtSlot as Slot # noqa from PyQt5.QtCore import pyqtSlot as Slot # noqa
@@ -24,6 +29,7 @@ if PYQT5:
elif PYQT6: elif PYQT6:
from PyQt6.QtCore import QT_VERSION_STR as __version__ from PyQt6.QtCore import QT_VERSION_STR as __version__
from PyQt6.QtCore import * from PyQt6.QtCore import *
from PyQt6.QtCore import pyqtBoundSignal as SignalInstance # noqa
from PyQt6.QtCore import pyqtProperty as Property # noqa from PyQt6.QtCore import pyqtProperty as Property # noqa
from PyQt6.QtCore import pyqtSignal as Signal # noqa from PyQt6.QtCore import pyqtSignal as Signal # noqa
from PyQt6.QtCore import pyqtSlot as Slot # noqa from PyQt6.QtCore import pyqtSlot as Slot # noqa

View File

@@ -1,4 +1,22 @@
__all__ = ("QMessageHandler", "ensure_object_thread", "ensure_main_thread") __all__ = (
"create_worker",
"ensure_main_thread",
"ensure_object_thread",
"FunctionWorker",
"GeneratorWorker",
"new_worker_qthread",
"QMessageHandler",
"thread_worker",
"WorkerBase",
)
from ._ensure_thread import ensure_main_thread, ensure_object_thread from ._ensure_thread import ensure_main_thread, ensure_object_thread
from ._message_handler import QMessageHandler from ._message_handler import QMessageHandler
from ._qthreading import (
FunctionWorker,
GeneratorWorker,
WorkerBase,
create_worker,
new_worker_qthread,
thread_worker,
)

View File

@@ -0,0 +1,899 @@
from __future__ import annotations
import inspect
import time
import warnings
from functools import partial, wraps
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Generic,
Optional,
Sequence,
Set,
Type,
TypeVar,
Union,
overload,
)
from typing_extensions import Literal, ParamSpec
from ..qtcompat.QtCore import QObject, QRunnable, QThread, QThreadPool, QTimer, Signal
if TYPE_CHECKING:
_T = TypeVar("_T")
class SigInst(Generic[_T]):
@staticmethod
def connect(slot: Callable[[_T], Any], type: Optional[type] = ...) -> None:
...
@staticmethod
def disconnect(slot: Callable[[_T], Any] = ...) -> None:
...
@staticmethod
def emit(*args: _T) -> None:
...
_Y = TypeVar("_Y")
_S = TypeVar("_S")
_R = TypeVar("_R")
_P = ParamSpec("_P")
def as_generator_function(
func: Callable[_P, _R]
) -> Callable[_P, Generator[None, None, _R]]:
"""Turns a regular function (single return) into a generator function."""
@wraps(func)
def genwrapper(*args, **kwargs) -> Generator[None, None, _R]:
yield
return func(*args, **kwargs)
return genwrapper
class WorkerBaseSignals(QObject):
started = Signal() # emitted when the work is started
finished = Signal() # emitted when the work is finished
_finished = Signal(object) # emitted when the work is finished ro delete
returned = Signal(object) # emitted with return value
errored = Signal(object) # emitted with error object on Exception
warned = Signal(tuple) # emitted with showwarning args on warning
class WorkerBase(QRunnable, Generic[_R]):
"""Base class for creating a Worker that can run in another thread.
Parameters
----------
SignalsClass : type, optional
A QObject subclass that contains signals, by default WorkerBaseSignals
Attributes
----------
signals: WorkerBaseSignals
signal emitter object. To allow identify which worker thread emitted signal.
"""
#: A set of Workers. Add to set using :meth:`WorkerBase.start`
_worker_set: Set[WorkerBase] = set()
returned: SigInst[_R]
errored: SigInst[Exception]
warned: SigInst[tuple]
started: SigInst[None]
finished: SigInst[None]
def __init__(
self,
func: Optional[Callable[_P, _R]] = None,
SignalsClass: Type[WorkerBaseSignals] = WorkerBaseSignals,
) -> None:
super().__init__()
self._abort_requested = False
self._running = False
self.signals = SignalsClass()
def __getattr__(self, name: str) -> SigInst:
"""Pass through attr requests to signals to simplify connection API.
The goal is to enable ``worker.yielded.connect`` instead of
``worker.signals.yielded.connect``. Because multiple inheritance of Qt
classes is not well supported in PyQt, we have to use composition here
(signals are provided by QObjects, and QRunnable is not a QObject). So
this passthrough allows us to connect to signals on the ``_signals``
object.
"""
# the Signal object is actually a class attribute
attr = getattr(self.signals.__class__, name, None)
if isinstance(attr, Signal):
# but what we need to connect to is the instantiated signal
# (which is of type `SignalInstance` in PySide and
# `pyqtBoundSignal` in PyQt)
return getattr(self.signals, name)
raise AttributeError(
f"{self.__class__.__name__!r} object has no attribute {name!r}"
)
def quit(self) -> None:
"""Send a request to abort the worker.
.. note::
It is entirely up to subclasses to honor this method by checking
``self.abort_requested`` periodically in their ``worker.work``
method, and exiting if ``True``.
"""
self._abort_requested = True
@property
def abort_requested(self) -> bool:
"""Whether the worker has been requested to stop."""
return self._abort_requested
@property
def is_running(self) -> bool:
"""Whether the worker has been started"""
return self._running
def run(self) -> None:
"""Start the worker.
The end-user should never need to call this function.
But it cannot be made private or renamed, since it is called by Qt.
The order of method calls when starting a worker is:
.. code-block:: none
calls QThreadPool.globalInstance().start(worker)
| triggered by the QThreadPool.start() method
| | called by worker.run
| | |
V V V
worker.start -> worker.run -> worker.work
**This** is the function that actually gets called when calling
:func:`QThreadPool.start(worker)`. It simply wraps the :meth:`work`
method, and emits a few signals. Subclasses should NOT override this
method (except with good reason), and instead should implement
:meth:`work`.
"""
self.started.emit()
self._running = True
try:
with warnings.catch_warnings():
warnings.filterwarnings("always")
warnings.showwarning = lambda *w: self.warned.emit(w)
result = self.work()
if isinstance(result, Exception):
if isinstance(result, RuntimeError):
# The Worker object has likely been deleted.
# A deleted wrapped C/C++ object may result in a runtime
# error that will cause segfault if we try to do much other
# than simply notify the user.
warnings.warn(
f"RuntimeError in aborted thread: {result}",
RuntimeWarning,
)
return
else:
raise result
if not self.abort_requested:
self.returned.emit(result)
except Exception as exc:
self.errored.emit(exc)
self._running = False
self.finished.emit()
self._finished.emit(self)
def work(self) -> Union[Exception, _R]:
"""Main method to execute the worker.
The end-user should never need to call this function.
But subclasses must implement this method (See
:meth:`GeneratorFunction.work` for an example implementation).
Minimally, it should check ``self.abort_requested`` periodically and
exit if True.
Examples
--------
.. code-block:: python
class MyWorker(WorkerBase):
def work(self):
i = 0
while True:
if self.abort_requested:
self.aborted.emit()
break
i += 1
if i > max_iters:
break
time.sleep(0.5)
"""
raise NotImplementedError(
f'"{self.__class__.__name__}" failed to define work() method'
)
def start(self) -> None:
"""Start this worker in a thread and add it to the global threadpool.
The order of method calls when starting a worker is:
.. code-block:: none
calls QThreadPool.globalInstance().start(worker)
| triggered by the QThreadPool.start() method
| | called by worker.run
| | |
V V V
worker.start -> worker.run -> worker.work
"""
if self in self._worker_set:
raise RuntimeError("This worker is already started!")
# This will raise a RunTimeError if the worker is already deleted
repr(self)
self._worker_set.add(self)
self._finished.connect(self._set_discard)
start_ = partial(QThreadPool.globalInstance().start, self)
QTimer.singleShot(10, start_)
@classmethod
def _set_discard(cls, obj: WorkerBase) -> None:
cls._worker_set.discard(obj)
@classmethod
def await_workers(cls, msecs: int = None) -> None:
"""Ask all workers to quit, and wait up to `msec` for quit.
Attempts to clean up all running workers by calling ``worker.quit()``
method. Any workers in the ``WorkerBase._worker_set`` set will have this
method.
By default, this function will block indefinitely, until worker threads
finish. If a timeout is provided, a ``RuntimeError`` will be raised if
the workers do not gracefully exit in the time requests, but the threads
will NOT be killed. It is (currently) left to the user to use their OS
to force-quit rogue threads.
.. important::
If the user does not put any yields in their function, and the function
is super long, it will just hang... For instance, there's no graceful
way to kill this thread in python:
.. code-block:: python
@thread_worker
def ZZZzzz():
time.sleep(10000000)
This is why it's always advisable to use a generator that periodically
yields for long-running computations in another thread.
See `this stack-overflow post
<https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread>`_
for a good discussion on the difficulty of killing a rogue python thread:
Parameters
----------
msecs : int, optional
Waits up to msecs milliseconds for all threads to exit and removes all
threads from the thread pool. If msecs is `None` (the default), the
timeout is ignored (waits for the last thread to exit).
Raises
------
RuntimeError
If a timeout is provided and workers do not quit successfully within
the time allotted.
"""
for worker in cls._worker_set:
worker.quit()
msecs = msecs if msecs is not None else -1
if not QThreadPool.globalInstance().waitForDone(msecs):
raise RuntimeError(
f"Workers did not quit gracefully in the time allotted ({msecs} ms)"
)
class FunctionWorker(WorkerBase[_R]):
"""QRunnable with signals that wraps a simple long-running function.
.. note::
``FunctionWorker`` does not provide a way to stop a very long-running
function (e.g. ``time.sleep(10000)``). So whenever possible, it is
better to implement your long running function as a generator that
yields periodically, and use the :class:`GeneratorWorker` instead.
Parameters
----------
func : Callable
A function to call in another thread
*args
will be passed to the function
**kwargs
will be passed to the function
Raises
------
TypeError
If ``func`` is a generator function and not a regular function.
"""
def __init__(self, func: Callable[_P, _R], *args, **kwargs):
if inspect.isgeneratorfunction(func):
raise TypeError(
f"Generator function {func} cannot be used with FunctionWorker, "
"use GeneratorWorker instead",
)
super().__init__()
self._func = func
self._args = args
self._kwargs = kwargs
def work(self) -> _R:
return self._func(*self._args, **self._kwargs)
class GeneratorWorkerSignals(WorkerBaseSignals):
yielded = Signal(object) # emitted with yielded values (if generator used)
paused = Signal() # emitted when a running job has successfully paused
resumed = Signal() # emitted when a paused job has successfully resumed
aborted = Signal() # emitted when a running job is successfully aborted
class GeneratorWorker(WorkerBase, Generic[_Y, _S, _R]):
"""QRunnable with signals that wraps a long-running generator.
Provides a convenient way to run a generator function in another thread,
while allowing 2-way communication between threads, using plain-python
generator syntax in the original function.
Parameters
----------
func : callable
The function being run in another thread. May be a generator function.
SignalsClass : type, optional
A QObject subclass that contains signals, by default
GeneratorWorkerSignals
*args
Will be passed to func on instantiation
**kwargs
Will be passed to func on instantiation
"""
yielded: SigInst[_Y]
paused: SigInst[None]
resumed: SigInst[None]
aborted: SigInst[None]
def __init__(
self,
func: Callable[_P, Generator[_Y, Optional[_S], _R]],
*args,
SignalsClass: Type[WorkerBaseSignals] = GeneratorWorkerSignals,
**kwargs,
):
if not inspect.isgeneratorfunction(func):
raise TypeError(
f"Regular function {func} cannot be used with GeneratorWorker, "
"use FunctionWorker instead",
)
super().__init__(SignalsClass=SignalsClass)
self._gen = func(*args, **kwargs)
self._incoming_value: Optional[_S] = None
self._pause_requested = False
self._resume_requested = False
self._paused = False
# polling interval: ONLY relevant if the user paused a running worker
self._pause_interval = 0.01
self.pbar = None
def work(self) -> Union[Optional[_R], Exception]:
"""Core event loop that calls the original function.
Enters a continual loop, yielding and returning from the original
function. Checks for various events (quit, pause, resume, etc...).
(To clarify: we are creating a rudimentary event loop here because
there IS NO Qt event loop running in the other thread to hook into)
"""
while True:
if self.abort_requested:
self.aborted.emit()
break
if self._paused:
if self._resume_requested:
self._paused = False
self._resume_requested = False
self.resumed.emit()
else:
time.sleep(self._pause_interval)
continue
elif self._pause_requested:
self._paused = True
self._pause_requested = False
self.paused.emit()
continue
try:
input = self._next_value()
output = self._gen.send(input)
self.yielded.emit(output)
except StopIteration as exc:
return exc.value
except RuntimeError as exc:
# The worker has probably been deleted. warning will be
# emitted in ``WorkerBase.run``
return exc
return None
def send(self, value: _S):
"""Send a value into the function (if a generator was used)."""
self._incoming_value = value
def _next_value(self) -> Optional[_S]:
out = None
if self._incoming_value is not None:
out = self._incoming_value
self._incoming_value = None
return out
@property
def is_paused(self) -> bool:
"""Whether the worker is currently paused."""
return self._paused
def toggle_pause(self) -> None:
"""Request to pause the worker if playing or resume if paused."""
if self.is_paused:
self._resume_requested = True
else:
self._pause_requested = True
def pause(self) -> None:
"""Request to pause the worker."""
if not self.is_paused:
self._pause_requested = True
def resume(self) -> None:
"""Send a request to resume the worker."""
if self.is_paused:
self._resume_requested = True
#############################################################################
# convenience functions for creating Worker instances
@overload
def create_worker(
func: Callable[_P, Generator[_Y, _S, _R]],
*args,
_start_thread: Optional[bool] = None,
_connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
_worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None,
_ignore_errors: bool = False,
**kwargs,
) -> GeneratorWorker[_Y, _S, _R]:
...
@overload
def create_worker(
func: Callable[_P, _R],
*args,
_start_thread: Optional[bool] = None,
_connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
_worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None,
_ignore_errors: bool = False,
**kwargs,
) -> FunctionWorker[_R]:
...
def create_worker(
func: Callable,
*args,
_start_thread: Optional[bool] = None,
_connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
_worker_class: Union[Type[GeneratorWorker], Type[FunctionWorker], None] = None,
_ignore_errors: bool = False,
**kwargs,
) -> Union[FunctionWorker, GeneratorWorker]:
"""Convenience function to start a function in another thread.
By default, uses :class:`Worker`, but a custom ``WorkerBase`` subclass may
be provided. If so, it must be a subclass of :class:`Worker`, which
defines a standard set of signals and a run method.
Parameters
----------
func : Callable
The function to call in another thread.
_start_thread : bool, optional
Whether to immediaetly start the thread. If False, the returned worker
must be manually started with ``worker.start()``. by default it will be
``False`` if the ``_connect`` argument is ``None``, otherwise ``True``.
_connect : Dict[str, Union[Callable, Sequence]], optional
A mapping of ``"signal_name"`` -> ``callable`` or list of ``callable``:
callback functions to connect to the various signals offered by the
worker class. by default None
_worker_class : type of GeneratorWorker or FunctionWorker, optional
The :class`WorkerBase` to instantiate, by default
:class:`FunctionWorker` will be used if ``func`` is a regular function,
and :class:`GeneratorWorker` will be used if it is a generator.
_ignore_errors : bool, optional
If ``False`` (the default), errors raised in the other thread will be
reraised in the main thread (makes debugging significantly easier).
*args
will be passed to ``func``
**kwargs
will be passed to ``func``
Returns
-------
worker : WorkerBase
An instantiated worker. If ``_start_thread`` was ``False``, the worker
will have a `.start()` method that can be used to start the thread.
Raises
------
TypeError
If a worker_class is provided that is not a subclass of WorkerBase.
TypeError
If _connect is provided and is not a dict of ``{str: callable}``
Examples
--------
.. code-block:: python
def long_function(duration):
import time
time.sleep(duration)
worker = create_worker(long_function, 10)
"""
worker: Union[FunctionWorker, GeneratorWorker]
if not _worker_class:
if inspect.isgeneratorfunction(func):
_worker_class = GeneratorWorker
else:
_worker_class = FunctionWorker
if not inspect.isclass(_worker_class) and issubclass(_worker_class, WorkerBase):
raise TypeError(f"Worker {_worker_class} must be a subclass of WorkerBase")
worker = _worker_class(func, *args, **kwargs)
if _connect is not None:
if not isinstance(_connect, dict):
raise TypeError("The '_connect' argument must be a dict")
if _start_thread is None:
_start_thread = True
for key, val in _connect.items():
_val = val if isinstance(val, (tuple, list)) else [val]
for v in _val:
if not callable(v):
raise TypeError(
f"_connect[{key!r}] must be a function or sequence of functions"
)
getattr(worker, key).connect(v)
# if the user has not provided a default connection for the "errored"
# signal... and they have not explicitly set ``ignore_errors=True``
# Then rereaise any errors from the thread.
if not _ignore_errors and not (_connect or {}).get("errored", False):
def reraise(e):
raise e
worker.errored.connect(reraise)
if _start_thread:
worker.start()
return worker
@overload
def thread_worker(
function: Callable[_P, Generator[_Y, _S, _R]],
start_thread: Optional[bool] = None,
connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
worker_class: Optional[Type[WorkerBase]] = None,
ignore_errors: bool = False,
) -> Callable[_P, GeneratorWorker[_Y, _S, _R]]:
...
@overload
def thread_worker(
function: Callable[_P, _R],
start_thread: Optional[bool] = None,
connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
worker_class: Optional[Type[WorkerBase]] = None,
ignore_errors: bool = False,
) -> Callable[_P, FunctionWorker[_R]]:
...
@overload
def thread_worker(
function: Literal[None] = None,
start_thread: Optional[bool] = None,
connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
worker_class: Optional[Type[WorkerBase]] = None,
ignore_errors: bool = False,
) -> Callable[[Callable], Callable[_P, Union[FunctionWorker, GeneratorWorker]]]:
...
def thread_worker(
function: Optional[Callable] = None,
start_thread: Optional[bool] = None,
connect: Optional[Dict[str, Union[Callable, Sequence[Callable]]]] = None,
worker_class: Optional[Type[WorkerBase]] = None,
ignore_errors: bool = False,
):
"""Decorator that runs a function in a separate thread when called.
When called, the decorated function returns a :class:`WorkerBase`. See
:func:`create_worker` for additional keyword arguments that can be used
when calling the function.
The returned worker will have these signals:
- *started*: emitted when the work is started
- *finished*: emitted when the work is finished
- *returned*: emitted with return value
- *errored*: emitted with error object on Exception
It will also have a ``worker.start()`` method that can be used to start
execution of the function in another thread. (useful if you need to connect
callbacks to signals prior to execution)
If the decorated function is a generator, the returned worker will also
provide these signals:
- *yielded*: emitted with yielded values
- *paused*: emitted when a running job has successfully paused
- *resumed*: emitted when a paused job has successfully resumed
- *aborted*: emitted when a running job is successfully aborted
And these methods:
- *quit*: ask the thread to quit
- *toggle_paused*: toggle the running state of the thread.
- *send*: send a value into the generator. (This requires that your
decorator function uses the ``value = yield`` syntax)
Parameters
----------
function : callable
Function to call in another thread. For communication between threads
may be a generator function.
start_thread : bool, optional
Whether to immediaetly start the thread. If False, the returned worker
must be manually started with ``worker.start()``. by default it will be
``False`` if the ``_connect`` argument is ``None``, otherwise ``True``.
connect : Dict[str, Union[Callable, Sequence]], optional
A mapping of ``"signal_name"`` -> ``callable`` or list of ``callable``:
callback functions to connect to the various signals offered by the
worker class. by default None
worker_class : Type[WorkerBase], optional
The :class`WorkerBase` to instantiate, by default
:class:`FunctionWorker` will be used if ``func`` is a regular function,
and :class:`GeneratorWorker` will be used if it is a generator.
ignore_errors : bool, optional
If ``False`` (the default), errors raised in the other thread will be
reraised in the main thread (makes debugging significantly easier).
Returns
-------
callable
function that creates a worker, puts it in a new thread and returns
the worker instance.
Examples
--------
.. code-block:: python
@thread_worker
def long_function(start, end):
# do work, periodically yielding
i = start
while i <= end:
time.sleep(0.1)
yield i
# do teardown
return 'anything'
# call the function to start running in another thread.
worker = long_function()
# connect signals here if desired... or they may be added using the
# `connect` argument in the `@thread_worker` decorator... in which
# case the worker will start immediately when long_function() is called
worker.start()
"""
def _inner(func):
@wraps(func)
def worker_function(*args, **kwargs):
# decorator kwargs can be overridden at call time by using the
# underscore-prefixed version of the kwarg.
kwargs["_start_thread"] = kwargs.get("_start_thread", start_thread)
kwargs["_connect"] = kwargs.get("_connect", connect)
kwargs["_worker_class"] = kwargs.get("_worker_class", worker_class)
kwargs["_ignore_errors"] = kwargs.get("_ignore_errors", ignore_errors)
return create_worker(
func,
*args,
**kwargs,
)
return worker_function
return _inner if function is None else _inner(function)
############################################################################
# This is a variant on the above pattern, it uses QThread instead of Qrunnable
# see https://doc.qt.io/qt-5/threads-technologies.html#comparison-of-solutions
# (it appears from that table that QRunnable cannot emit or receive signals,
# but we circumvent that here with our WorkerBase class that also inherits from
# QObject... providing signals/slots).
#
# A benefit of the QRunnable pattern is that Qt manages the threads for you,
# in the QThreadPool.globalInstance() ... making it easier to reuse threads,
# and reduce overhead.
#
# However, a disadvantage is that you have no access to (and therefore less
# control over) the QThread itself. See for example all of the methods
# provided on the QThread object: https://doc.qt.io/qt-5/qthread.html
if TYPE_CHECKING:
class WorkerProtocol(QObject):
finished: Signal
def work(self) -> None:
...
def new_worker_qthread(
Worker: Type[WorkerProtocol],
*args,
_start_thread: bool = False,
_connect: Dict[str, Callable] = None,
**kwargs,
):
"""This is a convenience function to start a worker in a Qthread.
In most cases, the @thread_worker decorator is sufficient and preferable.
But this allows the user to completely customize the Worker object.
However, they must then maintain control over the thread and clean up
appropriately.
It follows the pattern described here:
https://www.qt.io/blog/2010/06/17/youre-doing-it-wrong
and
https://doc.qt.io/qt-5/qthread.html#details
see also:
https://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/
A QThread object is not a thread! It should be thought of as a class to
*manage* a thread, not as the actual code or object that runs in that
thread. The QThread object is created on the main thread and lives there.
Worker objects which derive from QObject are the things that actually do
the work. They can be moved to a QThread as is done here.
.. note:: Mostly ignorable detail
While the signals/slots syntax of the worker looks very similar to
standard "single-threaded" signals & slots, note that inter-thread
signals and slots (automatically) use an event-based QueuedConnection,
while intra-thread signals use a DirectConnection. See `Signals and
Slots Across Threads
<https://doc.qt.io/qt-5/threads-qobject.html#signals-and-slots-across-threads>`_
Parameters
----------
Worker : QObject
QObject type that implements a `work()` method. The Worker should also
emit a finished signal when the work is done.
_start_thread : bool
If True, thread will be started immediately, otherwise, thread must
be manually started with thread.start().
_connect : dict, optional
Optional dictionary of {signal: function} to connect to the new worker.
for instance: _connect = {'incremented': myfunc} will result in:
worker.incremented.connect(myfunc)
*args
will be passed to the Worker class on instantiation.
**kwargs
will be passed to the Worker class on instantiation.
Returns
-------
worker : WorkerBase
The created worker.
thread : QThread
The thread on which the worker is running.
Examples
--------
Create some QObject that has a long-running work method:
.. code-block:: python
class Worker(QObject):
finished = Signal()
increment = Signal(int)
def __init__(self, argument):
super().__init__()
self.argument = argument
@Slot()
def work(self):
# some long running task...
import time
for i in range(10):
time.sleep(1)
self.increment.emit(i)
self.finished.emit()
worker, thread = new_worker_qthread(
Worker,
'argument',
_start_thread=True,
_connect={'increment': print},
)
"""
if _connect and not isinstance(_connect, dict):
raise TypeError("_connect parameter must be a dict")
thread = QThread()
worker = Worker(*args, **kwargs)
worker.moveToThread(thread)
thread.started.connect(worker.work)
worker.finished.connect(thread.quit)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
if _connect:
[getattr(worker, key).connect(val) for key, val in _connect.items()]
if _start_thread:
thread.start() # sometimes need to connect stuff before starting
return worker, thread

View File

@@ -1,4 +1,5 @@
import inspect import inspect
import os
import time import time
from concurrent.futures import Future, TimeoutError from concurrent.futures import Future, TimeoutError
@@ -7,6 +8,8 @@ import pytest
from superqt.qtcompat.QtCore import QCoreApplication, QObject, QThread, Signal from superqt.qtcompat.QtCore import QCoreApplication, QObject, QThread, Signal
from superqt.utils import ensure_main_thread, ensure_object_thread from superqt.utils import ensure_main_thread, ensure_object_thread
skip_on_ci = pytest.mark.skipif(bool(os.getenv("CI")), reason="github hangs")
class SampleObject(QObject): class SampleObject(QObject):
assigment_done = Signal() assigment_done = Signal()
@@ -122,25 +125,8 @@ def test_only_main_thread(qapp):
assert ob.sample_object_thread_property == 7 assert ob.sample_object_thread_property == 7
def test_object_thread(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
with qtbot.waitSignal(ob.assigment_done):
ob.check_object_thread(2, b=4)
assert ob.object_thread_res == {"a": 2, "b": 4}
with qtbot.waitSignal(ob.assigment_done):
ob.sample_object_thread_property = "text"
assert ob.sample_object_thread_property == "text"
assert ob.thread() is thread
with qtbot.waitSignal(thread.finished):
thread.exit(0)
def test_main_thread(qtbot): def test_main_thread(qtbot):
print("test_main_thread start")
ob = SampleObject() ob = SampleObject()
t = LocalThread(ob) t = LocalThread(ob)
with qtbot.waitSignal(t.finished): with qtbot.waitSignal(t.finished):
@@ -148,40 +134,7 @@ def test_main_thread(qtbot):
assert ob.main_thread_res == {"a": 5, "b": 8} assert ob.main_thread_res == {"a": 5, "b": 8}
assert ob.sample_main_thread_property == "text2" assert ob.sample_main_thread_property == "text2"
print("test_main_thread done")
def test_object_thread_return(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
assert ob.check_object_thread_return(2) == 14
assert ob.thread() is thread
with qtbot.waitSignal(thread.finished):
thread.exit(0)
def test_object_thread_return_timeout(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
with pytest.raises(TimeoutError):
ob.check_object_thread_return_timeout(2)
with qtbot.waitSignal(thread.finished):
thread.exit(0)
def test_object_thread_return_future(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
future = ob.check_object_thread_return_future(2)
assert isinstance(future, Future)
assert future.result() == 14
with qtbot.waitSignal(thread.finished):
thread.exit(0)
def test_main_thread_return(qtbot): def test_main_thread_return(qtbot):
@@ -210,3 +163,59 @@ def test_names(qapp):
assert list(signature.parameters.values())[0].name == "a" assert list(signature.parameters.values())[0].name == "a"
assert list(signature.parameters.values())[0].annotation == int assert list(signature.parameters.values())[0].annotation == int
assert ob.check_main_thread_return.__name__ == "check_main_thread_return" assert ob.check_main_thread_return.__name__ == "check_main_thread_return"
# @skip_on_ci
def test_object_thread_return(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
assert ob.check_object_thread_return(2) == 14
assert ob.thread() is thread
with qtbot.waitSignal(thread.finished):
thread.quit()
# @skip_on_ci
def test_object_thread_return_timeout(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
with pytest.raises(TimeoutError):
ob.check_object_thread_return_timeout(2)
with qtbot.waitSignal(thread.finished):
thread.quit()
@skip_on_ci
def test_object_thread_return_future(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
future = ob.check_object_thread_return_future(2)
assert isinstance(future, Future)
assert future.result() == 14
with qtbot.waitSignal(thread.finished):
thread.quit()
@skip_on_ci
def test_object_thread(qtbot):
ob = SampleObject()
thread = QThread()
thread.start()
ob.moveToThread(thread)
with qtbot.waitSignal(ob.assigment_done):
ob.check_object_thread(2, b=4)
assert ob.object_thread_res == {"a": 2, "b": 4}
with qtbot.waitSignal(ob.assigment_done):
ob.sample_object_thread_property = "text"
assert ob.sample_object_thread_property == "text"
assert ob.thread() is thread
with qtbot.waitSignal(thread.finished):
thread.quit()

276
tests/test_threadworker.py Normal file
View File

@@ -0,0 +1,276 @@
import inspect
import time
import warnings
from functools import partial
from operator import eq
import pytest
import superqt.utils._qthreading as qthreading
equals_1 = partial(eq, 1)
equals_3 = partial(eq, 3)
skip = pytest.mark.skipif(True, reason="testing")
def test_as_generator_function():
"""Test we can convert a regular function to a generator function."""
def func():
return
assert not inspect.isgeneratorfunction(func)
newfunc = qthreading.as_generator_function(func)
assert inspect.isgeneratorfunction(newfunc)
assert list(newfunc()) == [None]
# qtbot is necessary for qthreading here.
# note: pytest-cov cannot check coverage of code run in the other thread.
def test_thread_worker(qtbot):
"""Test basic threadworker on a function"""
@qthreading.thread_worker
def func():
return 1
wrkr = func()
assert isinstance(wrkr, qthreading.FunctionWorker)
signals = [wrkr.returned, wrkr.finished]
checks = [equals_1, lambda: True]
with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
wrkr.start()
def test_thread_generator_worker(qtbot):
"""Test basic threadworker on a generator"""
@qthreading.thread_worker
def func():
yield 1
yield 1
return 3
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.yielded, wrkr.returned, wrkr.finished]
checks = [equals_1, equals_1, equals_3, lambda: True]
with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
wrkr.start()
qtbot.wait(500)
def test_thread_raises2(qtbot):
handle_val = [0]
def handle_raise(e):
handle_val[0] = 1
assert isinstance(e, ValueError)
assert str(e) == "whoops"
@qthreading.thread_worker(connect={"errored": handle_raise}, start_thread=False)
def func():
yield 1
yield 1
raise ValueError("whoops")
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.yielded, wrkr.errored, wrkr.finished]
checks = [equals_1, equals_1, None, None]
with qtbot.waitSignals(signals, check_params_cbs=checks):
wrkr.start()
assert handle_val[0] == 1
def test_thread_warns(qtbot):
"""Test warnings get returned to main thread"""
def check_warning(w):
return str(w) == "hey!"
@qthreading.thread_worker(connect={"warned": check_warning}, start_thread=False)
def func():
yield 1
warnings.warn("hey!")
yield 3
warnings.warn("hey!")
return 1
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.warned, wrkr.yielded, wrkr.returned]
checks = [equals_1, None, equals_3, equals_1]
with qtbot.waitSignals(signals, check_params_cbs=checks):
wrkr.start()
def test_multiple_connections(qtbot):
"""Test the connect dict accepts a list of functions, and type checks"""
test1_val = [0]
test2_val = [0]
def func():
return 1
def test1(v):
test1_val[0] = 1
assert v == 1
def test2(v):
test2_val[0] = 1
assert v == 1
thread_func = qthreading.thread_worker(
func, connect={"returned": [test1, test2]}, start_thread=False
)
worker = thread_func()
assert isinstance(worker, qthreading.FunctionWorker)
with qtbot.waitSignal(worker.finished):
worker.start()
assert test1_val[0] == 1
assert test2_val[0] == 1
# they must all be functions
with pytest.raises(TypeError):
qthreading.thread_worker(func, connect={"returned": ["test1", test2]})()
# they must all be functions
with pytest.raises(TypeError):
qthreading.thread_worker(func, connect=test1)()
def test_create_worker(qapp):
"""Test directly calling create_worker."""
def func(x, y):
return x + y
worker = qthreading.create_worker(func, 1, 2)
assert isinstance(worker, qthreading.WorkerBase)
with pytest.raises(TypeError):
_ = qthreading.create_worker(func, 1, 2, _worker_class=object)
# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_worker_in_main_thread(qapp):
"""Test basic threadworker on a function"""
def func(x):
return x
thread_func = qthreading.thread_worker(func)
worker = thread_func(2)
# NOTE: you shouldn't normally call worker.work()! If you do, it will NOT
# be run in a separate thread (as it would for worker.start().
# This is for the sake of testing it in the main thread.
assert worker.work() == 2
# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_generator_worker_in_main_thread(qapp):
"""Test basic threadworker on a generator in the main thread with methods."""
def func():
i = 0
while i < 10:
i += 1
incoming = yield i
i = incoming if incoming is not None else i
return 3
worker = qthreading.thread_worker(func, start_thread=False)()
counter = 0
def handle_pause():
time.sleep(0.1)
assert worker.is_paused
worker.toggle_pause()
def test_yield(v):
nonlocal counter
counter += 1
if v == 2:
assert not worker.is_paused
worker.pause()
assert not worker.is_paused
if v == 3:
worker.send(7)
if v == 9:
worker.quit()
def handle_abort():
assert counter == 5 # because we skipped a few by sending in 7
worker.paused.connect(handle_pause)
assert isinstance(worker, qthreading.GeneratorWorker)
worker.yielded.connect(test_yield)
worker.aborted.connect(handle_abort)
# NOTE: you shouldn't normally call worker.work()! If you do, it will NOT
# be run in a separate thread (as it would for worker.start().
# This is for the sake of testing it in the main thread.
assert worker.work() is None # because we aborted it
assert not worker.is_paused
assert counter == 5
worker2 = qthreading.thread_worker(func, start_thread=False)()
assert worker2.work() == 3
def test_worker_base_attribute(qapp):
obj = qthreading.WorkerBase()
assert obj.started is not None
assert obj.finished is not None
assert obj.returned is not None
assert obj.errored is not None
with pytest.raises(AttributeError):
obj.aa
def test_abort_does_not_return(qtbot):
loop_counter = 0
def long_running_func():
nonlocal loop_counter
for _ in range(5):
yield loop_counter
time.sleep(0.1)
loop_counter += 1
abort_counter = 0
def count_abort():
nonlocal abort_counter
abort_counter += 1
return_counter = 0
def returned_handler(value):
nonlocal return_counter
return_counter += 1
threaded_function = qthreading.thread_worker(
long_running_func,
connect={
"returned": returned_handler,
"aborted": count_abort,
},
)
worker = threaded_function()
worker.quit()
qtbot.wait(600)
assert loop_counter < 4
assert abort_counter == 1
assert return_counter == 0

View File

@@ -65,4 +65,4 @@ extras =
pyside6: pyside6 pyside6: pyside6
commands_pre = commands_pre =
pyqt6,pyside6: pip install -U pytest-qt@git+https://github.com/pytest-dev/pytest-qt.git pyqt6,pyside6: pip install -U pytest-qt@git+https://github.com/pytest-dev/pytest-qt.git
commands = pytest --color=yes --cov=superqt --cov-report=xml {posargs} commands = pytest --color=yes --cov=superqt --cov-report=xml -v {posargs}