Source code for tibco.liveview.listeners.lv_basic_listener

import threading
import time
from typing import Callable, Optional
from ..exceptions import LiveViewException
from ..models import LiveQuery


[docs]class BasicQueryListener: """A BasicQueryListener listens for and processes any events from a live query. Any non-error event received will be processed with the ``callback`` argument. Any error event received will be processed with the optional ``error_callback`` argument. If no ``error_callback`` is provided, the listener will simply store the most recent exception as a member variable accessible through ``BasicQueryListener#get_error_result``. """ def __init__(self, live_query: LiveQuery, callback: Callable, error_callback: Callable = None): """Initialize a listener which listens for events from a LiveQuery and calls a custom callback function on them. Optionally accepts an error callback function that handles errors. If none is provided, errors will be raised and the thread will terminate. If the error callback function returns a value, this value will be accessible via the get_error_result() method. Args: live_query: A LiveQuery object initialized with the desired Live Query to monitor callback: A Callable which accepts one argument. Will be called on each event that comes in. error_callback: Optionally, a Callable which accepts one argument. Will be called on any Exceptions that are raised """ self._live_query = live_query self._callback = callback # Default to a basic function which raises any thread runtime errors if error_callback is None: self._err_callback = _default_err_callback else: # Let the user define how to handle errors if they want self._err_callback = error_callback self._error_result = None self._thread: Optional[threading.Thread] = None self._should_run = False def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop()
[docs] def start(self) -> None: """Start listening for events and calling the callback on incoming data. Returns: None """ try: self._live_query.get_schema() except LiveViewException as e: raise LiveViewException(f'Invalid query: {e}') def listen(): self._should_run = True # Run until the thread is told to stop while self._should_run: try: with self._live_query: for event in self._live_query: self._callback(event) except Exception as err: # Call the user defined callback to handle errors and keep going. self._error_result = self._err_callback(err) time.sleep(1) self._thread = threading.Thread(target=listen, daemon=True) self._thread.start()
[docs] def stop(self) -> None: """Stop handling events and close the thread. Returns: None """ if not self._should_run: raise LiveViewException('BasicQueryListener not started') self._should_run = False self._live_query.close() if self._thread is None: raise LiveViewException('Error in BasicQueryListener.stop(): thread never started') self._thread.join()
[docs] def get_error_result(self): """The user-defined error_callback function may return a result. This method returns that result for access outside the event loop thread. Returns: The last result returned by the error_callback function """ return self._error_result
def _default_err_callback(exception): return exception