import threading
import time
from typing import Callable, Optional
from ..exceptions import LiveViewException
from ..models import LiveQuery
[docs]class BasicQueryListener:
"""A BasicQueryListener listens for and processes any events from a live query.
Any non-error event received will be processed with the ``callback`` argument.
Any error event received will be processed with the optional ``error_callback`` argument.
If no ``error_callback`` is provided, the listener will simply store the most recent exception
as a member variable accessible through ``BasicQueryListener#get_error_result``.
"""
def __init__(self, live_query: LiveQuery, callback: Callable, error_callback: Callable = None):
"""Initialize a listener which listens for events from a LiveQuery and calls a custom callback
function on them. Optionally accepts an error callback function that handles errors. If none is provided,
errors will be raised and the thread will terminate. If the error callback function returns a value,
this value will be accessible via the get_error_result() method.
Args:
live_query: A LiveQuery object initialized with the desired Live Query to monitor
callback: A Callable which accepts one argument. Will be called on each event that comes in.
error_callback: Optionally, a Callable which accepts one argument. Will be called on any Exceptions that
are raised
"""
self._live_query = live_query
self._callback = callback
# Default to a basic function which raises any thread runtime errors
if error_callback is None:
self._err_callback = _default_err_callback
else:
# Let the user define how to handle errors if they want
self._err_callback = error_callback
self._error_result = None
self._thread: Optional[threading.Thread] = None
self._should_run = False
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
[docs] def start(self) -> None:
"""Start listening for events and calling the callback on incoming data.
Returns: None
"""
try:
self._live_query.get_schema()
except LiveViewException as e:
raise LiveViewException(f'Invalid query: {e}')
def listen():
self._should_run = True
# Run until the thread is told to stop
while self._should_run:
try:
with self._live_query:
for event in self._live_query:
self._callback(event)
except Exception as err:
# Call the user defined callback to handle errors and keep going.
self._error_result = self._err_callback(err)
time.sleep(1)
self._thread = threading.Thread(target=listen, daemon=True)
self._thread.start()
[docs] def stop(self) -> None:
"""Stop handling events and close the thread.
Returns: None
"""
if not self._should_run:
raise LiveViewException('BasicQueryListener not started')
self._should_run = False
self._live_query.close()
if self._thread is None:
raise LiveViewException('Error in BasicQueryListener.stop(): thread never started')
self._thread.join()
[docs] def get_error_result(self):
"""The user-defined error_callback function may return a result. This method returns that
result for access outside the event loop thread.
Returns: The last result returned by the error_callback function
"""
return self._error_result
def _default_err_callback(exception):
return exception