import json
import sseclient
import requests
import time
import re
from typing import Optional, List, Dict
from ..exceptions import LiveViewException
[docs]class LiveQuery:
"""Container class for a LiveQuery.
It's recommended that you don't create a LiveQuery directly, but instead
follow the below usage.
Usage::
import liveview
# Assume Hello LiveView is running at lv://localhost:11080
liveview_server_uri = 'lv://localhost:11080'
client = liveview.get_client(liveview_server_uri)
with client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales') as live_query:
results = live_query.take(20)
print(results)
"""
def __init__(self, uri: str) -> None:
self._uri: str = uri
self._terminate_query: bool = True
self._schema: Optional[Dict] = None
self._sse_client: sseclient.SSEClient = None
[docs] def get_schema(self) -> Dict:
"""Get this query's schema.
Returns (dict): A mapping from column names to data types.
"""
# Try to return the cached schema for this query
if self._schema is not None:
return self._schema
# Try to get the schema and cache it
try:
get_schema_uri, query = re.search(r'(^.*/lv/api/v1/tables/.+/tuples)/live\?(query=.*)', self._uri).groups()
response = requests.head(f'{get_schema_uri}?{query}')
if response.status_code != 200:
raise LiveViewException(f'Schema request {response.url} failed with error: {response.status_code} '
f'{response.text}')
self._schema = json.loads(response.headers.get('X-Query-Schema'))
return self._schema
except AttributeError:
raise LiveViewException(f'Failed to get schema because unable to parse URI {self._uri}')
[docs] def take(self, num_tuples: int) -> List:
"""Get the next ``num_tuples`` from this query. Blocks."""
iterator = self.iterator()
return [next(iterator) for _ in range(num_tuples)]
def __enter__(self):
self._terminate_query = False
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def iterator(self):
"""Get an iterator which yields results from this live query."""
return self.__iter__()
def __iter__(self):
if self._terminate_query:
raise LiveViewException('LiveQuery not open. Please use syntax `with live_query: ...`')
try:
self._sse_client = sseclient.SSEClient(requests.get(self._uri, stream=True))
for event in self._sse_client.events():
if self._terminate_query:
break
data = json.loads(event.data)
if data.get('type') == 'begin_snapshot':
self.schema = data.get('data').get('schema')
yield data
except requests.exceptions.ConnectionError as e:
# Sleep for a moment in the event of a connection error
time.sleep(0.5)
raise e
except StopIteration:
raise ConnectionError(f'Connection to event source {self._uri} stopped yielding events')
finally:
if self._sse_client:
self._sse_client.close()
[docs] def close(self):
"""Close this query.
You don't need to call this method if you use the ``with live_query`` syntax.
"""
self._terminate_query = True
if self._sse_client:
self._sse_client.close()
self._sse_client = None