from typing import Optional, List, Union
[docs]class Query:
"""A Query allows you to build LiveQL expressions.
See `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_
for details.
Note that this class allows you to create invalid queries. It's up to you to create valid queries.
Usage::
q = Query('ItemsSales').select('category', 'Item').where(category='electronics', Item='Surge Protector')
live_query = client.live_query(q)
# Or
# live_query = client.live_query_from_s(q.table_name, q.to_s())
"""
def __init__(self, table_name):
self._table_name: str = table_name
self._projection: Optional[str] = '*'
self._predicate: Optional[str] = None
self._for: Optional[float] = None
self._colname_timestamp: Optional[str] = None
self._between: Optional[(str, str)] = None
self._group_by_cols: Optional[str] = None
self._order_by_cols: Optional[str] = None
self._pivot_expr_list: Optional[str] = None
self._pivot_for_col: Optional[str] = None
self._pivot_values: Optional[str] = None
self._pivot_group_by: Optional[str] = None
self._limit: Optional[int] = None
self._having_expr: Optional[str] = None
@property
def table_name(self) -> str:
"""The name of this query's table"""
return self._table_name
[docs] def select(self, *projection: str):
"""Define a projection over this Query's table.
If no arguments are given, the projection will default to '*'.
Args:
*projection: one or more columns from the table
Returns:
This Query object
Usage::
query = Query('ItemsSales').select('Item', 'category', 'transactionTime')
"""
if len(projection) == 0:
projection = ('*',)
self._projection = ', '.join(self._sanitize_token_list(projection))
return self
[docs] def where(self, str_predicate: str = '', **conj_predicates):
"""Define a predicate to filter rows from this Query.
Args:
str_predicate (str):
Optional; a string predicate like:
::
"Item = 'This' OR Item = 'That' AND NOT category = 'automotive'"
**conj_predicates:
Keyword arg conjunctive predicates that will be ANDed together.
For example:
::
Query('CarSales').select('*').where(year=2021, country='France').to_s()
will give you the string:
::
"SELECT * FROM CarSales WHERE year = 2021 AND country = 'France'"
Returns:
This Query object
Usage::
query1 = Query('ItemsSales').select('Item').where(category='Automotive')
# Or
query2 = Query('ItemsSales').select('Item').where("NOT category = 'Automotive'")
"""
token_pairs = []
for field, expr in conj_predicates.items():
if type(expr) is str:
token_pairs.append((field, f"'{self._sanitize_token(expr)}'"))
else:
token_pairs.append((field, expr))
self._predicate = str_predicate + ' AND '.join(
f"{self._sanitize_token(field)} = {self._sanitize_token(expr)}" for field, expr in token_pairs
)
return self
[docs] def for_ms(self, milliseconds: int):
"""Define a time-delay modifier.
See the Time-Based Data Modifiers section of the `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_
for details.
Args:
milliseconds (int): The number of milliseconds for which this Query's predicate should hold
Returns:
This Query object
"""
self._for = milliseconds
return self
[docs] def when(self, colname_timestamp: str):
"""Define a column for this query to provide time-windowed results around. To be used with `between()`.
Args:
colname_timestamp (str): The name of a column around which a time-window will be defined
Returns:
This Query object
Usage::
from liveview.util import now_timestamp
time1 = now_timestamp()
# ... Some logic during which time passes
time2 = now_timestamp()
query = Query('ItemsSales').select('Item').when('transactionTime').between(time1, time2)
"""
self._colname_timestamp = colname_timestamp
return self
[docs] def between(self, timestamp1, timestamp2):
"""To be used with `when()`. Give two timestamps to define the bounds of this query's time-windowed results.
Args:
timestamp1: A timestamp which defines the beginning of a time-window
timestamp2: A timestamp which defines the end of a time-window
Returns:
This Query object
Usage::
from liveview.util import now_timestamp
time1 = now_timestamp()
# ... Some logic during which time passes
time2 = now_timestamp()
query = Query('ItemsSales').select('Item').when('transactionTime').between(time1, time2)
"""
self._between = (timestamp1, timestamp2)
return self
[docs] def group_by(self, *cols):
"""Group-by columns.
Args:
*cols: One or more columns to group by.
Returns:
This Query object
Usage::
query = Query('ItemsSales').select('category', 'COUNT(Item) AS itemCount').group_by('category')
# query.to_s() == "SELECT category, COUNT(Item) AS itemCount FROM ItemsSales GROUP BY category"
"""
self._group_by_cols = ', '.join(self._sanitize_token_list(cols))
return self
[docs] def order_by(self, *cols: str, **col_orders):
"""Define this query's ordering
Usage::
# Default to descending order
# ORDER BY item DESC, transactionTime DESC
query.order_by('item', 'transactionTime')
# Or order by ascending order
# ORDER BY transactionTime ASC
query.order_by(transactionTime='ASC')
Args:
*cols (str): column names which will default to ASC ordering
**col_orders: column name keyword args which allow you to specify DESC with ``itemsSales='DESC'``
Returns:
This Query object
"""
col_pairs = []
for col in cols:
col_pairs.append((self._sanitize_token(col), 'DESC'))
for col in col_orders:
col_pairs.append((self._sanitize_token(col), col_orders[col]))
self._order_by_cols = ', '.join([f'{col} {order}' for col, order in col_pairs])
return self
[docs] def limit(self, limit: int):
"""Limit the number of rows that this query will return.
Args:
limit (int): the max number of rows that this query should return
Returns:
This Query object
"""
self._limit = limit
return self
[docs] def pivot(self, expr_list: List[str], values: List, for_col: str = None, group_by: Union[str, List[str]] = []):
"""Have this query return results from a pivot table.
See the Pivot Modifier section of the `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_
for details.
Args:
expr_list (List[str]): A list of expressions which will fill the rows of the pivot table.
values (List): A list of values found in `for_col`, these values will be the column titles of the pivot table.
for_col (str): A column which contains values in the `values` list.
group_by (Union[str, List[str]]): a string or list of strings which are column names to group by.
Returns:
"""
self._pivot_expr_list = ', '.join(self._sanitize_token_list(expr_list))
self._pivot_for_col = self._sanitize_token(for_col)
self._pivot_values = f'[{", ".join(map(str, self._sanitize_token_list(values)))}]'
# group_by can be a single string or a list of strings, so store it accordingly
if type(group_by) == str:
self._pivot_group_by = self._sanitize_token(group_by)
else:
self._pivot_group_by = ', '.join(self._sanitize_token_list(group_by))
return self
[docs] def having(self, having_expr: str):
"""Provide a HAVING clause to this query.
Usage::
query = Query('ItemsSales').select('Item', 'category').order_by('category', Item='ASC').having('AVG(lastSoldPrice) > 5')
# query.to_s() == 'SELECT Item, category FROM ItemsSales ORDER BY category DESC, Item ASC HAVING AVG(lastSoldPrice) > 5'
Args:
having_expr (str): an aggregate HAVING expression
Returns:
This Query object
"""
self._having_expr = self._sanitize_token(having_expr)
return self
[docs] def to_s(self) -> str:
"""Return LiveQL string built by this Query
Returns:
This Query object as a string which LiveView can parse, if valid.
"""
# SELECT columns FROM table_name
query = f'SELECT {self._projection} FROM {self._table_name}'
# Where predicate
if self._predicate:
query += f' WHERE {self._predicate}'
# Time-window modifier
if self._for:
query += f' FOR {self._for} MILLISECONDS'
# Time-delay modifier
if self._colname_timestamp:
query += f' WHEN {self._colname_timestamp}'
# Between clause
if self._between:
query += f' BETWEEN {self._between[0]} AND {self._between[1]}'
# Group by clause
if self._group_by_cols:
query += f' GROUP BY {self._group_by_cols}'
# Order by clause
if self._order_by_cols:
query += f' ORDER BY {self._order_by_cols}'
# Pivot clause
if self._pivot_expr_list:
query += f' PIVOT {self._pivot_expr_list}'
if self._pivot_for_col:
query += f' FOR {self._pivot_for_col}'
query += f' VALUES {self._pivot_values}'
if self._pivot_group_by:
query += f' GROUP BY {self._pivot_group_by}'
# Row result limit
if self._limit:
query += f' LIMIT {self._limit}'
# Having clause
if self._having_expr:
query += f' HAVING {self._having_expr}'
return query
def __str__(self):
return self.to_s()
def _sanitize_token(self, arg):
if str(arg).lower() in self._reserved_tokens:
return f'#"{arg}"'
return arg
def _sanitize_token_list(self, arg_list):
return [self._sanitize_token(arg) for arg in arg_list]
_reserved_tokens = {'advance', 'declare', 'from', 'into', 'offset', 'primary', 'stream',
'using', 'always', 'default', 'gather', 'join', 'on', 'private',
'table', 'valid', 'and', 'delete', 'group', 'key', 'or',
'public', 'then', 'vjoin', 'apply',
'desc', 'having', 'limit', 'order', 'replace', 'time',
'where', 'as', 'duplicate', 'heartbeat',
'lock', 'outer', 'returning', 'timeout', 'window', 'asc',
'error', 'if', 'lockset', 'output',
'schema', 'true', 'with', 'between', 'every', 'implements',
'materialized', 'parallel', 'secondary',
'truncate', 'within', 'bsort', 'except', 'import',
'merge', 'parameters', 'select', 'tuples',
'by', 'extends', 'index', 'metronome', 'partition',
'set', 'union', 'constant', 'false', 'input', 'not', 'pattern', 'size', 'unlock', 'create',
'foreach', 'insert', 'null', 'predicate', 'slack', 'update', 'cacheable', 'else', 'define',
'function', 'command', 'old', 'publishersn', 'for',
'pivot', 'values', 'key', 'publisherid'}