Source code for tibco.liveview.lv_client

import urllib.parse
import requests
from typing import List

from .exceptions import LiveViewException
from .lv_query import Query
from .liveview_generated_client import api
from .models import LiveQuery, SnapshotResult, TableMetadata, \
    BufferedLVPublisher


[docs]class LiveViewClient: """Provides interface for all LiveView Python client functionality. Parameters: uri: The URI string for the client to connect to, for example: ``'lv://localhost:11080'`` username (optional): LiveView server username password (optional): LiveView server password """ def __init__(self, uri: str, username: str = '', password: str = '') -> None: self.client = api.V11Api() self.uri = uri self.configuration = self.client.api_client.configuration self.configuration.username = username self.configuration.password = password self._connect(uri) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self._close_api() def _close_api(self): # Clear out any connections which may be hanging around self.client.api_client.rest_client.pool_manager.clear() def _connect(self, uri) -> None: # Sanitize the lv:// and lvs:// if uri.startswith('lv://'): uri = uri.replace('lv://', 'http://') if uri.startswith('lvs://'): uri = uri.replace('lvs://', 'https://') # Otherwise sanitize to ensure string can be formatted later. if uri.endswith('/lv/api/v1'): self.configuration.host = uri self.uri = uri elif uri.endswith('/'): self._connect(uri.rstrip('/')) else: self._connect(uri + '/lv/api/v1')
[docs] def ensure_connection(self) -> None: """Raises a LiveViewException if not connected. Raises: LiveViewException: if not connected """ # If connected, returns nothing. If not, throws error. try: self.get_table_metadata('LVTables') except Exception: raise LiveViewException('Not connected')
[docs] def snapshot_query(self, query: Query) -> SnapshotResult: """Issues a snapshot query to the server and returns the results. Args: query (Query): The query to run. Returns: SnapshotResult: Snapshot result. """ return self.snapshot_query_from_s(query.table_name, query.to_s())
[docs] def snapshot_query_from_s(self, table_name: str, query: str) -> SnapshotResult: """Issues a snapshot query to the server and returns the results. Parameters: table_name (str): The name of the table to query. query (str) : The full LiveQL query to send. Returns: SnapshotResult: Snapshot result. """ snap = self.client.issue_snapshot_query(table_name, query=query) return SnapshotResult(snap)
[docs] def live_query(self, query: Query): """Prepare a live query from a ``Query`` object. Usage:: import liveview from liveview import Query client = liveview.get_client('lv://localhost:11080') live_query = client.live_query(Query('ItemsSales').select('*')) with live_query: next_ten_results = live_query.take(10) Parameters: query (Query): A query object """ return self.live_query_from_s(query.table_name, query.to_s())
[docs] def live_query_from_s(self, table_name: str, query: str) -> LiveQuery: """Prepare a live query from a string. Usage:: import liveview client = liveview.get_client('lv://localhost:11080') query = client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales') with live_query: next_ten_results = live_query.take(10) Parameters: table_name (str) : The name of the table to query. query (str) : The full LiveQL query to send. Returns: LiveQuery: Live query which may be listened to. """ uri = self.uri # If credentials are used, set up basic auth in the URI string if self.configuration.username or self.configuration.password: (protocol, address, *_) = self.uri.split('//') uri = f'{protocol}' \ f'//{_urlquote(self.configuration.username)}:{_urlquote(self.configuration.password)}' \ f'@{address}' # Assemble a URI to an event stream live_query_uri = '/'.join([uri, 'tables', table_name, f'tuples/live?query={_urlquote(query)}']) return LiveQuery(live_query_uri)
[docs] def get_table_metadata(self, table_name, include_internal=False) -> TableMetadata: """Get metadata for a table Parameters: table_name (str): The name of the table. include_internal (bool): Whether to include internal LiveView fields Returns: A TableInfo object containing the tables metadata. """ table = self.client.get_table(table_name, include_internal=include_internal) return TableMetadata(table)
[docs] def list_tables(self) -> list: """ Lists the names of every table on the server. Returns: - A list of names (strings) """ return list(self.client.get_tables())
[docs] def get_buffered_publisher(self, table_name: str, publish_interval: float = 1) -> BufferedLVPublisher: """ Args: table_name: name of table for publishing to publish_interval: every `publish_interval` seconds, the publisher will publish any tuples in its buffer Returns: A publisher which maintains a buffer that it will try to flush to an LV server at a specified interval. """ return BufferedLVPublisher(self.client, self.uri, table_name, publish_interval)
[docs] def delete_tuples_by_query(self, query: Query): """Deletes tuples retrieved by a query. Args: query (Query): A query whose results should be deleted. """ return self.delete_tuples_by_query_s(query.table_name, query.to_s())
[docs] def delete_tuples_by_query_s(self, table_name: str, query_string: str): """Deletes tuples given a table name and a query string. Args: table_name (str): A table from which to delete tuples. query_string (str): A query over table_name whose results should be deleted. """ sanitized_query = _urlquote(query_string) url = f'{self.uri}/tables/{table_name}/tuples?query={sanitized_query}' try: res = requests.delete(url) if res.status_code != 200: raise LiveViewException(f'Delete tuples return {res.status_code}: {res.json()}') except Exception as e: raise LiveViewException(f'Failed to delete tuples with error: {e}') return res
[docs] def delete_tuples(self, table_name: str, tuples: List) -> None: """Deletes tuples given a table name and a list of tuples. Args: table_name (str): The table from which to delete tuples. tuples (list): A list of tuple primary keys which point to tuples to delete """ try: self.client.issue_delete(table_name, tuples=tuples) except Exception as e: raise LiveViewException(f'Failed to delete tuples: {e}')
def _urlquote(s): return urllib.parse.quote(s)