Source code for VeraGridEngine.Utils.Filtering.filtering

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0
from typing import List, Union
from enum import Enum
import re
import numpy as np


[docs] def is_odd(number: int): """ Check if number is odd :param number: :return: """ return number % 2 != 0
[docs] def is_numeric(obj: np.ndarray) -> bool: """ Checks if the numpy array is numeric :param obj: :return: """ attrs = ['__add__', '__sub__', '__mul__', '__truediv__', '__pow__'] return all(hasattr(obj, attr) for attr in attrs)
[docs] class CompOps(Enum): """ Enumeration of filter oprations """ __slots__ = () GT = ">" LT = "<" GEQ = ">=" LEQ = "<=" NOT_EQ = "!=" EQ = "=" LIKE = "like" NOT_LIKE = "notlike" STARTS = "starts" ENDS = "ends" def __str__(self): return str(self.value) def __repr__(self): return str(self)
[docs] @staticmethod def argparse(s): """ :param s: :return: """ try: return CompOps[s] except KeyError: return s
[docs] @classmethod def list(cls): """ :return: """ return list(map(lambda c: c.value, cls))
[docs] class FilterOps(Enum): """ Enumeration of filter operations """ __slots__ = () AND = "and" OR = "or" def __str__(self): return str(self.value) def __repr__(self): return str(self)
[docs] @staticmethod def argparse(s): """ :param s: :return: """ try: return FilterOps[s] except KeyError: return s
[docs] @classmethod def list(cls): """ :return: """ return list(map(lambda c: c.value, cls))
[docs] class FilterSubject(Enum): """ Enumeration of filter operations """ __slots__ = () COL = "col" IDX = "idx" VAL = "val" COL_OBJECT = "colobj" IDX_OBJECT = "idxobj" def __str__(self): return str(self.value) def __repr__(self): return str(self)
[docs] @staticmethod def argparse(s): """ :param s: :return: """ try: return FilterSubject[s] except KeyError: return s
[docs] @classmethod def list(cls): """ :return: """ return list(map(lambda c: c.value, cls))
PRIMARY_TYPES = Union[float, bool, int, str]
[docs] class Filter: """ Filter """ __slots__ = ( "element", "element_args", "op", "value", ) def __init__(self, element: FilterSubject, element_args: List[str], op: CompOps, value: Union[PRIMARY_TYPES, List[PRIMARY_TYPES]]): """ Filter constructor :param element: FilterSubject :param element_args: further search elements :param op: CompOps :param value: Comparison value """ self.element = element self.element_args: List[str] = element_args self.op = op self.value = value def __str__(self): return f"{self.element} {self.op} {self.value}" def __repr__(self): return str(self)
[docs] def is_negative(self) -> bool: """ Is the filter operation negative? :return: is negative? """ if self.op == CompOps.GT: return False elif self.op == CompOps.LT: return False elif self.op == CompOps.GEQ: return False elif self.op == CompOps.LEQ: return False elif self.op == CompOps.NOT_EQ: return True elif self.op == CompOps.EQ: return False elif self.op == CompOps.LIKE: return False elif self.op == CompOps.NOT_LIKE: return True elif self.op == CompOps.STARTS: return False elif self.op == CompOps.ENDS: return False else: raise Exception(f"Unknown op: {self.op}")
[docs] def get_list_of_values(self) -> List[str]: """ Get a list of values to compare to :return: list of strings """ if "[" in self.value: val = self.value.replace("[", "").replace("]", "").strip() lst = [a.strip() for a in val.split(",")] else: lst = [self.value] return lst
[docs] @staticmethod def try_numeric(value): """ Try to convert a value to a numeric type :param value: :return: float """ try: float(value) return True except ValueError: return False except TypeError: return False
[docs] def apply_filter_op(self, obj_val: Union[float, str], val: Union[float, str]) -> bool: """ Apply the filter operation :param obj_val: value of the object :param val: value to compare :return: passes the filter? """ if self.op == CompOps.GT: if self.try_numeric(obj_val) and self.try_numeric(val): obj_val = float(obj_val) val = float(val) ok = obj_val > val else: ok = False elif self.op == CompOps.LT: if self.try_numeric(obj_val) and self.try_numeric(val): obj_val = float(obj_val) val = float(val) ok = obj_val < val else: ok = False elif self.op == CompOps.GEQ: if self.try_numeric(obj_val) and self.try_numeric(val): obj_val = float(obj_val) val = float(val) ok = obj_val >= val else: ok = False elif self.op == CompOps.LEQ: if self.try_numeric(obj_val) and self.try_numeric(val): obj_val = float(obj_val) val = float(val) ok = obj_val <= val else: ok = False elif self.op == CompOps.NOT_EQ: obj_val = str(obj_val).lower() val = str(val).lower() ok = obj_val != val elif self.op == CompOps.EQ: if self.try_numeric(obj_val) and self.try_numeric(val): obj_val = float(obj_val) val = float(val) ok = obj_val == val else: obj_val = str(obj_val).lower() val = str(val).lower() ok = obj_val == val elif self.op == CompOps.LIKE: obj_val = str(obj_val).lower() val = str(val).lower() ok = str(val) in str(obj_val) elif self.op == CompOps.NOT_LIKE: obj_val = str(obj_val).lower() val = str(val).lower() ok = val not in str(obj_val) elif self.op == CompOps.STARTS: obj_val = str(obj_val).lower() val = str(val).lower() ok = str(obj_val).startswith(val) elif self.op == CompOps.ENDS: obj_val = str(obj_val).lower() val = str(val).lower() ok = str(obj_val).endswith(val) else: ok = False return ok
[docs] class MasterFilter: """ MasterFilter """ __slots__ = ("stack",) def __init__(self) -> None: """ """ self.stack: List[Union[Filter, FilterOps]] = list()
[docs] def add(self, elm: Union[Filter, FilterOps]) -> None: """ Add filter or filter operation to the stack :param elm: filter or filter operation """ self.stack.append(elm)
[docs] def size(self) -> int: """ Get the size of the stack :return: int """ return len(self.stack)
[docs] def is_correct_size(self) -> bool: """ Returns if the stack has the right size: an odd number :return: """ return is_odd(self.size()) and self.size() > 0
[docs] def parse_single(token: str) -> Union[Filter, None]: """ Parse single token, these are tokens that are composed on 3 parts: element, operation, comparison value :param token: Token :return: Filter or None if the token is not valid """ # old: r'(?<=\s)([<>=!]=?|in|starts|ends|like|notlike)(?=\s)' # elms = re.split(r'(?<!\S)([<>=!]=?|in|starts|ends|like|notlike)(?!\S)', token) TOKEN_SPLIT_RE = re.compile( r'\s*(' # optional whitespace to discard r'\b(?:in|starts|ends|like|notlike)\b' # alphabetic operators – whole words r'|!=|>=|<=|[<>]|=' # symbolic operators r')\s*' # optional whitespace to discard ) elms = TOKEN_SPLIT_RE.split(token) if len(elms) == 3: if "." in elms[0]: coms = elms[0].strip().split(".") element = coms[0] coms.pop(0) element_args = coms else: element = elms[0].strip() element_args = list() try: filter_subject = FilterSubject(element) except ValueError as e: try: filter_subject = FilterSubject("idxobj") element_args = [element] except ValueError as e: return None return Filter(element=filter_subject, element_args=element_args, op=CompOps(elms[1].strip()), value=elms[2].strip()) else: # wrong filter return None
[docs] def has_operators(token: str) -> bool: """ Check if there are operators and , or in the token :param token: any token :return: has operators? """ words = token.split() return "and" in words or "or" in words
[docs] def parse_expression(expression: str) -> MasterFilter: """ Parses the query expression :param expression: :return: MasterFilter """ mst_flt = MasterFilter() master_tokens = re.split(r'(?<=\s)(and|or)(?=\s)', expression) for token in master_tokens: if has_operators(token): elm = FilterOps(token.strip()) mst_flt.add(elm=elm) else: flt = parse_single(token=token) if flt is not None: mst_flt.add(elm=flt) return mst_flt