from queue import Queue
from threading import Thread
from typing import List, Optional
import time
from ..liveview_generated_client.swagger_client import V11Api
from ..exceptions import LiveViewException
[docs]class BufferedLVPublisher:
"""Asynchronously publishes tuples to a LiveView table using a FIFO buffer.
Initialize with::
import liveview
client = liveview.get_client('lv://localhost:11080')
publisher = client.get_buffered_publisher(table_name='ItemsSales', publish_interval=1)
"""
def __init__(self, api_client: V11Api, host: str, table_name: str, publish_interval: float = 1):
"""Not recommended to initialize directly, instead do::
import liveview
client = liveview.get_client('lv://localhost:11080')
publisher = client.get_buffered_publisher(table_name='ItemsSales', publish_interval=1)
Args:
api_client: Swagger API client
host: Hostname of LiveView server
table_name: Table to publish tuples to
publish_interval: Interval in seconds at which tuples will be automatically published
"""
self.publish_interval: float = publish_interval
self.uri: str = f'{host}/tables/{table_name}/tuples'
self._api: V11Api = api_client
self._buffer: Queue = Queue()
self._table_name: str = table_name
self._thread: Optional[Thread] = None
self._thread_exception: Optional[Exception] = None
self._stopped: bool = True
self.start()
[docs] def publish(self, lv_tuple):
"""Insert a tuple or list of tuples into the buffer to be published when this object's publish interval"""
if self._stopped:
raise LiveViewException('Publisher manually stopped; call start() again or create a new publisher')
if self._thread_exception:
raise self._thread_exception
if type(lv_tuple) in (list, tuple):
for t in lv_tuple:
self._buffer.put(t)
else:
self._buffer.put(lv_tuple)
[docs] def flush_buffer(self) -> List:
"""Empty out the buffer and send it to the server.
Useful for publishing tuples at the end of a program, when
publish_interval seconds may not pass before Python exits.
Returns:
List of tuples published
"""
tuples_published = self._flush_impl()
if self._thread_exception:
raise self._thread_exception
return tuples_published
[docs] def start(self) -> None:
"""Publish any incoming tuples at an interval. Called when object is initialized."""
self._thread = Thread(target=self._worker, daemon=True)
self._stopped = False
self._thread.start()
[docs] def stop(self) -> None:
"""Stop the publisher from automatically publishing at an interval."""
if self._stopped:
raise LiveViewException('Already stopped')
self._stopped = True
self._thread.join()
[docs] def buffer_size(self) -> int:
"""Return the number of tuples currently in the publish queue"""
return self._buffer.qsize()
def _worker(self):
while not self._stopped:
time.sleep(self.publish_interval)
if not self._buffer.qsize() == 0:
self._flush_impl()
def _flush_impl(self) -> List:
"""
Publish tuples to LV server. Called internally by worker thread or by a manual call to self.flush_buffer()
Returns:
A list of tuples successfully published.
"""
# Send tuples to LV server
lv_tuples = []
while not self._buffer.empty():
lv_tuples.append(self._buffer.get())
try:
self._api.publish_to_table(self._table_name, lv_tuples)
except Exception as e:
# Note any exception
self._thread_exception = LiveViewException(f'Failed to publish tuples to {self.uri} with error {e}')
# Put the tuples back in the buffer
for t in lv_tuples:
self._buffer.put(t)
# Clear out the list; this function returns the tuples it published.
# We published no tuples -> we return an empty list.
lv_tuples = []
return lv_tuples