diff --git a/examples/throttle_mouse_event.py b/examples/throttle_mouse_event.py new file mode 100644 index 0000000..82f4c4b --- /dev/null +++ b/examples/throttle_mouse_event.py @@ -0,0 +1,29 @@ +from qtpy.QtCore import Signal +from qtpy.QtWidgets import QApplication, QWidget + +from superqt.utils import qthrottled + + +class Demo(QWidget): + positionChanged = Signal(int, int) + + def __init__(self) -> None: + super().__init__() + self.setMouseTracking(True) + self.positionChanged.connect(self._show_location) + + @qthrottled(timeout=400) # call this no more than once every 400ms + def _show_location(self, x, y): + print("Throttled event at", x, y) + + def mouseMoveEvent(self, event): + print("real move event at", event.x(), event.y()) + self.positionChanged.emit(event.x(), event.y()) + + +if __name__ == "__main__": + app = QApplication([]) + w = Demo() + w.resize(600, 600) + w.show() + app.exec_() diff --git a/examples/throttler_demo.py b/examples/throttler_demo.py new file mode 100644 index 0000000..ddc0a3e --- /dev/null +++ b/examples/throttler_demo.py @@ -0,0 +1,282 @@ +"""Adapted for python from the KDToolBox + +https://github.com/KDAB/KDToolBox/tree/master/qt/KDSignalThrottler + +MIT License + +Copyright (C) 2019-2022 Klarälvdalens Datakonsult AB, a KDAB Group company, +info@kdab.com + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +""" + +from typing import Deque + +from qtpy.QtCore import QRect, QSize, Qt, QTimer, Signal +from qtpy.QtGui import QPainter, QPen +from qtpy.QtWidgets import ( + QApplication, + QCheckBox, + QComboBox, + QFormLayout, + QGroupBox, + QHBoxLayout, + QLabel, + QPushButton, + QSizePolicy, + QSpinBox, + QVBoxLayout, + QWidget, +) + +from superqt.utils._throttler import ( + GenericSignalThrottler, + QSignalDebouncer, + QSignalThrottler, +) + + +class DrawSignalsWidget(QWidget): + def __init__(self, parent=None): + super().__init__(parent) + self.setFocusPolicy(Qt.FocusPolicy.NoFocus) + self.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.MinimumExpanding) + self.setAttribute(Qt.WA_OpaquePaintEvent) + + self._scrollTimer = QTimer(self) + self._scrollTimer.setInterval(10) + self._scrollTimer.timeout.connect(self._scroll) + self._scrollTimer.start() + + self._signalActivations: Deque[int] = Deque() + self._throttledSignalActivations: Deque[int] = Deque() + + def sizeHint(self): + return QSize(400, 200) + + def addSignalActivation(self): + self._signalActivations.appendleft(0) + + def addThrottledSignalActivation(self): + self._throttledSignalActivations.appendleft(0) + + def _scroll(self): + cutoff = self.width() + self.scrollAndCut(self._signalActivations, cutoff) + self.scrollAndCut(self._throttledSignalActivations, cutoff) + + self.update() + + def scrollAndCut(self, v: Deque[int], cutoff: int): + x = 0 + L = len(v) + for p in range(L): + v[p] += 1 + if v[p] > cutoff: + x = p + break + + # TODO: fix this... delete old ones + + def paintEvent(self, event): + p = QPainter(self) + p.fillRect(self.rect(), Qt.white) + + h = self.height() + h2 = h // 2 + w = self.width() + + self._drawSignals(p, self._signalActivations, Qt.red, 0, h2) + self._drawSignals(p, self._throttledSignalActivations, Qt.blue, h2, h) + + p.drawText( + QRect(0, 0, w, h2), + Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter, + "Source signal", + ) + p.drawText( + QRect(0, h2, w, h2), + Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter, + "Throttled signal", + ) + + p.save() + pen = QPen() + pen.setWidthF(2.0) + p.drawLine(0, h2, w, h2) + p.restore() + + def _drawSignals(self, p: QPainter, v: Deque[int], color, yStart, yEnd): + p.save() + pen = QPen() + pen.setWidthF(2.0) + pen.setColor(color) + p.setPen(pen) + + for i in v: + p.drawLine(i, yStart, i, yEnd) + p.restore() + + +class DemoWidget(QWidget): + signalToBeThrottled = Signal() + _throttler: GenericSignalThrottler + + def __init__(self, parent=None) -> None: + super().__init__(parent) + self._createUi() + self._throttler = None + + self._throttlerKindComboBox.currentIndexChanged.connect(self._createThrottler) + self._createThrottler() + + self._throttlerTimeoutSpinBox.valueChanged.connect(self.setThrottlerTimeout) + self.setThrottlerTimeout() + + self._mainButton.clicked.connect(self.signalToBeThrottled) + + self._autoTriggerTimer = QTimer(self) + self._autoTriggerTimer.setTimerType(Qt.TimerType.PreciseTimer) + self._autoTriggerCheckBox.clicked.connect(self._startOrStopAutoTriggerTimer) + self._startOrStopAutoTriggerTimer() + + self._autoTriggerIntervalSpinBox.valueChanged.connect( + self._setAutoTriggerTimeout + ) + self._setAutoTriggerTimeout() + + self._autoTriggerTimer.timeout.connect(self.signalToBeThrottled) + self.signalToBeThrottled.connect(self._drawSignalsWidget.addSignalActivation) + + def _createThrottler(self) -> None: + if self._throttler is not None: + self._throttler.deleteLater() + del self._throttler + + if self._throttlerKindComboBox.currentIndex() < 2: + cls = QSignalThrottler + else: + cls = QSignalDebouncer + if self._throttlerKindComboBox.currentIndex() % 2: + policy = QSignalThrottler.EmissionPolicy.Leading + else: + policy = QSignalThrottler.EmissionPolicy.Trailing + + self._throttler: GenericSignalThrottler = cls(policy, self) + + self._throttler.setTimerType(Qt.TimerType.PreciseTimer) + self.signalToBeThrottled.connect(self._throttler.throttle) + self._throttler.triggered.connect( + self._drawSignalsWidget.addThrottledSignalActivation + ) + + self.setThrottlerTimeout() + + def setThrottlerTimeout(self): + self._throttler.setTimeout(self._throttlerTimeoutSpinBox.value()) + + def _startOrStopAutoTriggerTimer(self): + shouldStart = self._autoTriggerCheckBox.isChecked() + if shouldStart: + self._autoTriggerTimer.start() + else: + self._autoTriggerTimer.stop() + + self._autoTriggerIntervalSpinBox.setEnabled(shouldStart) + self._autoTriggerLabel.setEnabled(shouldStart) + + def _setAutoTriggerTimeout(self): + timeout = self._autoTriggerIntervalSpinBox.value() + self._autoTriggerTimer.setInterval(timeout) + + def _createUi(self): + helpLabel = QLabel(self) + helpLabel.setWordWrap(True) + helpLabel.setText( + "

SignalThrottler example

" + "

This example demonstrates the differences between " + "the different kinds of signal throttlers and debouncers." + ) + + throttlerKindGroupBox = QGroupBox("Throttler configuration", self) + + self._throttlerKindComboBox = QComboBox(throttlerKindGroupBox) + self._throttlerKindComboBox.addItems( + ( + "Throttler, trailing", + "Throttler, leading", + "Debouncer, trailing", + "Debouncer, leading", + ) + ) + + self._throttlerTimeoutSpinBox = QSpinBox(throttlerKindGroupBox) + self._throttlerTimeoutSpinBox.setRange(1, 5000) + self._throttlerTimeoutSpinBox.setValue(500) + self._throttlerTimeoutSpinBox.setSuffix(" ms") + + layout = QFormLayout(throttlerKindGroupBox) + layout.addRow("Kind of throttler:", self._throttlerKindComboBox) + layout.addRow("Timeout:", self._throttlerTimeoutSpinBox) + throttlerKindGroupBox.setLayout(layout) + + buttonGroupBox = QGroupBox("Throttler activation") + self._mainButton = QPushButton(("Press me!"), buttonGroupBox) + + self._autoTriggerCheckBox = QCheckBox("Trigger automatically") + + autoTriggerLayout = QHBoxLayout() + self._autoTriggerLabel = QLabel("Interval", buttonGroupBox) + self._autoTriggerIntervalSpinBox = QSpinBox(buttonGroupBox) + self._autoTriggerIntervalSpinBox.setRange(1, 5000) + self._autoTriggerIntervalSpinBox.setValue(100) + self._autoTriggerIntervalSpinBox.setSuffix(" ms") + + autoTriggerLayout.setContentsMargins(0, 0, 0, 0) + autoTriggerLayout.addWidget(self._autoTriggerLabel) + autoTriggerLayout.addWidget(self._autoTriggerIntervalSpinBox) + + layout = QVBoxLayout(buttonGroupBox) + layout.addWidget(self._mainButton) + layout.addWidget(self._autoTriggerCheckBox) + layout.addLayout(autoTriggerLayout) + buttonGroupBox.setLayout(layout) + + resultGroupBox = QGroupBox("Result") + self._drawSignalsWidget = DrawSignalsWidget(resultGroupBox) + layout = QVBoxLayout(resultGroupBox) + layout.addWidget(self._drawSignalsWidget) + resultGroupBox.setLayout(layout) + + layout = QVBoxLayout(self) + layout.addWidget(helpLabel) + layout.addWidget(throttlerKindGroupBox) + layout.addWidget(buttonGroupBox) + layout.addWidget(resultGroupBox) + + self.setLayout(layout) + + +if __name__ == "__main__": + app = QApplication([__name__]) + w = DemoWidget() + w.resize(600, 600) + w.show() + app.exec_() diff --git a/src/superqt/utils/__init__.py b/src/superqt/utils/__init__.py index c2e89c5..3c9f41c 100644 --- a/src/superqt/utils/__init__.py +++ b/src/superqt/utils/__init__.py @@ -8,6 +8,10 @@ __all__ = ( "QMessageHandler", "thread_worker", "WorkerBase", + "qthrottled", + "qdebounced", + "QSignalDebouncer", + "QSignalThrottler", ) from ._ensure_thread import ensure_main_thread, ensure_object_thread @@ -20,3 +24,4 @@ from ._qthreading import ( new_worker_qthread, thread_worker, ) +from ._throttler import QSignalDebouncer, QSignalThrottler, qdebounced, qthrottled diff --git a/src/superqt/utils/_throttler.py b/src/superqt/utils/_throttler.py new file mode 100644 index 0000000..b1880b3 --- /dev/null +++ b/src/superqt/utils/_throttler.py @@ -0,0 +1,367 @@ +"""Adapted for python from the KDToolBox + +https://github.com/KDAB/KDToolBox/tree/master/qt/KDSignalThrottler + +MIT License + +Copyright (C) 2019-2022 Klarälvdalens Datakonsult AB, a KDAB Group company, +info@kdab.com + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +""" +import sys +from concurrent.futures import Future +from enum import IntFlag, auto +from functools import wraps +from typing import TYPE_CHECKING, Callable, Generic, Optional, TypeVar, Union, overload + +from qtpy.QtCore import QObject, Qt, QTimer, Signal, SignalInstance +from typing_extensions import Literal, ParamSpec + + +class Kind(IntFlag): + Throttler = auto() + Debouncer = auto() + + +class EmissionPolicy(IntFlag): + Trailing = auto() + Leading = auto() + + +class GenericSignalThrottler(QObject): + + triggered = Signal() + timeoutChanged = Signal(int) + timerTypeChanged = Signal(Qt.TimerType) + + def __init__( + self, + kind: Kind, + emissionPolicy: EmissionPolicy, + parent: Optional[QObject] = None, + ) -> None: + super().__init__(parent) + + self._kind = kind + self._emissionPolicy = emissionPolicy + self._hasPendingEmission = False + + self._timer = QTimer() + self._timer.setSingleShot(True) + self._timer.setTimerType(Qt.TimerType.PreciseTimer) + self._timer.timeout.connect(self._maybeEmitTriggered) + + def kind(self) -> Kind: + """Return the kind of throttler (throttler or debouncer).""" + return self._kind + + def emissionPolicy(self) -> EmissionPolicy: + """Return the emission policy (trailing or leading).""" + return self._emissionPolicy + + def timeout(self) -> int: + """Return current timeout in milliseconds.""" + return self._timer.interval() # type: ignore + + def setTimeout(self, timeout: int) -> None: + """Set timeout in milliseconds""" + if self._timer.interval() != timeout: + self._timer.setInterval(timeout) + self.timeoutChanged.emit(timeout) + + def timerType(self) -> Qt.TimerType: + """Return current Qt.TimerType.""" + return self._timer.timerType() + + def setTimerType(self, timerType: Qt.TimerType) -> None: + """Set current Qt.TimerType.""" + if self._timer.timerType() != timerType: + self._timer.setTimerType(timerType) + self.timerTypeChanged.emit(timerType) + + def throttle(self) -> None: + """Emit triggered if not running, then start timer.""" + # public slot + self._hasPendingEmission = True + # Emit only if we haven't emitted already. We know if that's + # the case by checking if the timer is running. + if ( + self._emissionPolicy is EmissionPolicy.Leading + and not self._timer.isActive() + ): + self._emitTriggered() + + # The timer is started in all cases. If we got a signal, and we're Leading, + # and we did emit because of that, then we don't re-emit when the timer fires + # (unless we get ANOTHER signal). + if self._kind is Kind.Throttler: # sourcery skip: merge-duplicate-blocks + if not self._timer.isActive(): + self._timer.start() # actual start, not restart + elif self._kind is Kind.Debouncer: + self._timer.start() # restart + + assert self._timer.isActive() + + def cancel(self) -> None: + """ "Cancel and pending emissions.""" + self._hasPendingEmission = False + + def flush(self) -> None: + """ "Force emission of any pending emissions.""" + self._maybeEmitTriggered() + + def _emitTriggered(self) -> None: + self._hasPendingEmission = False + self.triggered.emit() + self._timer.start() + + def _maybeEmitTriggered(self) -> None: + if self._hasPendingEmission: + self._emitTriggered() + + Kind = Kind + EmissionPolicy = EmissionPolicy + + +# ### Convenience classes ### + + +class QSignalThrottler(GenericSignalThrottler): + """A Signal Throttler. + + This object's `triggered` signal will emit at most once per timeout + (set with setTimeout()). + """ + + def __init__( + self, + policy: EmissionPolicy = EmissionPolicy.Leading, + parent: Optional[QObject] = None, + ) -> None: + super().__init__(Kind.Throttler, policy, parent) + + +class QSignalDebouncer(GenericSignalThrottler): + """A Signal Debouncer. + + This object's `triggered` signal will not be emitted until `self.timeout()` + milliseconds have elapsed since the last time `triggered` was emitted. + """ + + def __init__( + self, + policy: EmissionPolicy = EmissionPolicy.Trailing, + parent: Optional[QObject] = None, + ) -> None: + super().__init__(Kind.Debouncer, policy, parent) + + +# below here part is unique to superqt (not from KD) + +P = ParamSpec("P") +R = TypeVar("R") + +if TYPE_CHECKING: + from typing_extensions import Protocol + + class ThrottledCallable(Generic[P, R], Protocol): + triggered: SignalInstance + + def cancel(self) -> None: + ... + + def flush(self) -> None: + ... + + def set_timeout(self, timeout: int) -> None: + ... + + if sys.version_info < (3, 9): + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Future: + ... + + else: + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Future[R]: + ... + + +@overload +def qthrottled( + func: Callable[P, R], + timeout: int = 100, + leading: bool = True, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> "ThrottledCallable[P, R]": + ... + + +@overload +def qthrottled( + func: Literal[None] = None, + timeout: int = 100, + leading: bool = True, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]: + ... + + +def qthrottled( + func: Optional[Callable[P, R]] = None, + timeout: int = 100, + leading: bool = True, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> Union[ + "ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"] +]: + """Creates a throttled function that invokes func at most once per timeout. + + The throttled function comes with a `cancel` method to cancel delayed func + invocations and a `flush` method to immediately invoke them. Options + to indicate whether func should be invoked on the leading and/or trailing + edge of the wait timeout. The func is invoked with the last arguments provided + to the throttled function. Subsequent calls to the throttled function return + the result of the last func invocation. + + This decorator may be used with or without parameters. + + Parameters + ---------- + func : Callable + A function to throttle + timeout : int + Timeout in milliseconds to wait before allowing another call, by default 100 + leading : bool + Whether to invoke the function on the leading edge of the wait timer, + by default True + timer_type : Qt.TimerType + The timer type. by default `Qt.TimerType.PreciseTimer` + One of: + - `Qt.PreciseTimer`: Precise timers try to keep millisecond accuracy + - `Qt.CoarseTimer`: Coarse timers try to keep accuracy within 5% of the + desired interval + - `Qt.VeryCoarseTimer`: Very coarse timers only keep full second accuracy + """ + return _make_decorator(func, timeout, leading, timer_type, Kind.Throttler) + + +@overload +def qdebounced( + func: Callable[P, R], + timeout: int = 100, + leading: bool = False, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> "ThrottledCallable[P, R]": + ... + + +@overload +def qdebounced( + func: Literal[None] = None, + timeout: int = 100, + leading: bool = False, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> Callable[[Callable[P, R]], "ThrottledCallable[P, R]"]: + ... + + +def qdebounced( + func: Optional[Callable[P, R]] = None, + timeout: int = 100, + leading: bool = False, + timer_type: Qt.TimerType = Qt.TimerType.PreciseTimer, +) -> Union[ + "ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"] +]: + """Creates a debounced function that delays invoking `func`. + + `func` will not be invoked until `timeout` ms have elapsed since the last time + the debounced function was invoked. + + The debounced function comes with a `cancel` method to cancel delayed func + invocations and a `flush` method to immediately invoke them. Options + indicate whether func should be invoked on the leading and/or trailing edge + of the wait timeout. The func is invoked with the *last* arguments provided to + the debounced function. Subsequent calls to the debounced function return the + result of the last `func` invocation. + + This decorator may be used with or without parameters. + + Parameters + ---------- + func : Callable + A function to throttle + timeout : int + Timeout in milliseconds to wait before allowing another call, by default 100 + leading : bool + Whether to invoke the function on the leading edge of the wait timer, + by default False + timer_type : Qt.TimerType + The timer type. by default `Qt.TimerType.PreciseTimer` + One of: + - `Qt.PreciseTimer`: Precise timers try to keep millisecond accuracy + - `Qt.CoarseTimer`: Coarse timers try to keep accuracy within 5% of the + desired interval + - `Qt.VeryCoarseTimer`: Very coarse timers only keep full second accuracy + """ + return _make_decorator(func, timeout, leading, timer_type, Kind.Debouncer) + + +def _make_decorator( + func: Optional[Callable[P, R]], + timeout: int, + leading: bool, + timer_type: Qt.TimerType, + kind: Kind, +) -> Union[ + "ThrottledCallable[P, R]", Callable[[Callable[P, R]], "ThrottledCallable[P, R]"] +]: + def deco(func: Callable[P, R]) -> "ThrottledCallable[P, R]": + policy = EmissionPolicy.Leading if leading else EmissionPolicy.Trailing + throttle = GenericSignalThrottler(kind, policy) + throttle.setTimerType(timer_type) + throttle.setTimeout(timeout) + last_f = None + future: Optional[Future] = None + + @wraps(func) + def inner(*args: P.args, **kwargs: P.kwargs) -> Future: + nonlocal last_f + nonlocal future + if last_f is not None: + throttle.triggered.disconnect(last_f) + if future is not None and not future.done(): + future.cancel() + + future = Future() + last_f = lambda: future.set_result(func(*args, **kwargs)) # noqa + throttle.triggered.connect(last_f) + throttle.throttle() + return future + + setattr(inner, "cancel", throttle.cancel) + setattr(inner, "flush", throttle.flush) + setattr(inner, "set_timeout", throttle.setTimeout) + setattr(inner, "triggered", throttle.triggered) + return inner # type: ignore + + return deco(func) if func is not None else deco diff --git a/tests/test_throttler.py b/tests/test_throttler.py new file mode 100644 index 0000000..f0c9daa --- /dev/null +++ b/tests/test_throttler.py @@ -0,0 +1,43 @@ +from unittest.mock import Mock + +from superqt.utils import qdebounced, qthrottled + + +def test_debounced(qtbot): + mock1 = Mock() + mock2 = Mock() + + @qdebounced(timeout=5) + def f1() -> str: + mock1() + + def f2() -> str: + mock2() + + for _ in range(10): + f1() + f2() + + qtbot.wait(5) + mock1.assert_called_once() + assert mock2.call_count == 10 + + +def test_throttled(qtbot): + mock1 = Mock() + mock2 = Mock() + + @qthrottled(timeout=5) + def f1() -> str: + mock1() + + def f2() -> str: + mock2() + + for _ in range(10): + f1() + f2() + + qtbot.wait(5) + assert mock1.call_count == 2 + assert mock2.call_count == 10