import urllib.parse
import requests
from typing import List
from .exceptions import LiveViewException
from .lv_query import Query
from .liveview_generated_client import api
from .models import LiveQuery, SnapshotResult, TableMetadata, \
BufferedLVPublisher
[docs]class LiveViewClient:
"""Provides interface for all LiveView Python client functionality.
Parameters:
uri: The URI string for the client to connect to, for example: ``'lv://localhost:11080'``
username (optional): LiveView server username
password (optional): LiveView server password
"""
def __init__(self, uri: str, username: str = '', password: str = '') -> None:
self.client = api.V11Api()
self.uri = uri
self.configuration = self.client.api_client.configuration
self.configuration.username = username
self.configuration.password = password
self._connect(uri)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._close_api()
def _close_api(self):
# Clear out any connections which may be hanging around
self.client.api_client.rest_client.pool_manager.clear()
def _connect(self, uri) -> None:
# Sanitize the lv:// and lvs://
if uri.startswith('lv://'):
uri = uri.replace('lv://', 'http://')
if uri.startswith('lvs://'):
uri = uri.replace('lvs://', 'https://')
# Otherwise sanitize to ensure string can be formatted later.
if uri.endswith('/lv/api/v1'):
self.configuration.host = uri
self.uri = uri
elif uri.endswith('/'):
self._connect(uri.rstrip('/'))
else:
self._connect(uri + '/lv/api/v1')
[docs] def ensure_connection(self) -> None:
"""Raises a LiveViewException if not connected.
Raises:
LiveViewException: if not connected
"""
# If connected, returns nothing. If not, throws error.
try:
self.get_table_metadata('LVTables')
except Exception:
raise LiveViewException('Not connected')
[docs] def snapshot_query(self, query: Query) -> SnapshotResult:
"""Issues a snapshot query to the server and returns the results.
Args:
query (Query): The query to run.
Returns:
SnapshotResult: Snapshot result.
"""
return self.snapshot_query_from_s(query.table_name, query.to_s())
[docs] def snapshot_query_from_s(self, table_name: str, query: str) -> SnapshotResult:
"""Issues a snapshot query to the server and returns the results.
Parameters:
table_name (str): The name of the table to query.
query (str) : The full LiveQL query to send.
Returns:
SnapshotResult: Snapshot result.
"""
snap = self.client.issue_snapshot_query(table_name, query=query)
return SnapshotResult(snap)
[docs] def live_query(self, query: Query):
"""Prepare a live query from a ``Query`` object.
Usage::
import liveview
from liveview import Query
client = liveview.get_client('lv://localhost:11080')
live_query = client.live_query(Query('ItemsSales').select('*'))
with live_query:
next_ten_results = live_query.take(10)
Parameters:
query (Query): A query object
"""
return self.live_query_from_s(query.table_name, query.to_s())
[docs] def live_query_from_s(self, table_name: str, query: str) -> LiveQuery:
"""Prepare a live query from a string.
Usage::
import liveview
client = liveview.get_client('lv://localhost:11080')
query = client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales')
with live_query:
next_ten_results = live_query.take(10)
Parameters:
table_name (str) : The name of the table to query.
query (str) : The full LiveQL query to send.
Returns:
LiveQuery: Live query which may be listened to.
"""
uri = self.uri
# If credentials are used, set up basic auth in the URI string
if self.configuration.username or self.configuration.password:
(protocol, address, *_) = self.uri.split('//')
uri = f'{protocol}' \
f'//{_urlquote(self.configuration.username)}:{_urlquote(self.configuration.password)}' \
f'@{address}'
# Assemble a URI to an event stream
live_query_uri = '/'.join([uri, 'tables', table_name, f'tuples/live?query={_urlquote(query)}'])
return LiveQuery(live_query_uri)
[docs] def list_tables(self) -> list:
"""
Lists the names of every table on the server.
Returns:
- A list of names (strings)
"""
return list(self.client.get_tables())
[docs] def get_buffered_publisher(self, table_name: str, publish_interval: float = 1) -> BufferedLVPublisher:
"""
Args:
table_name: name of table for publishing to
publish_interval: every `publish_interval` seconds, the publisher will publish any tuples in its buffer
Returns:
A publisher which maintains a buffer that it will try to flush to an LV server at a specified interval.
"""
return BufferedLVPublisher(self.client, self.uri, table_name, publish_interval)
[docs] def delete_tuples_by_query(self, query: Query):
"""Deletes tuples retrieved by a query.
Args:
query (Query): A query whose results should be deleted.
"""
return self.delete_tuples_by_query_s(query.table_name, query.to_s())
[docs] def delete_tuples_by_query_s(self, table_name: str, query_string: str):
"""Deletes tuples given a table name and a query string.
Args:
table_name (str): A table from which to delete tuples.
query_string (str): A query over table_name whose results should be deleted.
"""
sanitized_query = _urlquote(query_string)
url = f'{self.uri}/tables/{table_name}/tuples?query={sanitized_query}'
try:
res = requests.delete(url)
if res.status_code != 200:
raise LiveViewException(f'Delete tuples return {res.status_code}: {res.json()}')
except Exception as e:
raise LiveViewException(f'Failed to delete tuples with error: {e}')
return res
[docs] def delete_tuples(self, table_name: str, tuples: List) -> None:
"""Deletes tuples given a table name and a list of tuples.
Args:
table_name (str): The table from which to delete tuples.
tuples (list): A list of tuple primary keys which point to tuples to delete
"""
try:
self.client.issue_delete(table_name, tuples=tuples)
except Exception as e:
raise LiveViewException(f'Failed to delete tuples: {e}')
def _urlquote(s):
return urllib.parse.quote(s)