Source code for arpoc.ac.parser

# pylint: disable=R0903
# disable too few public methods

import importlib.resources
import logging
import re
from abc import abstractmethod
from functools import reduce
from typing import Any, Dict, List, TypeVar, Union, Callable

from ast import literal_eval

from collections.abc import Mapping
import lark.exceptions
from lark import Lark, Tree
from arpoc.ac.lark_adapter import MyTransformer

from arpoc.exceptions import (BadSemantics,
                              SubjectAttributeMissing,
                              ObjectAttributeMissing,
                              EnvironmentAttributeMissing)

with importlib.resources.path(
        'arpoc.resources',
        'grammar.lark') as grammar_path, open(grammar_path) as fp:
    grammar = fp.read()
lark_condition = Lark(grammar, start="condition")
lark_target = Lark(grammar, start="target")

LOGGER = logging.getLogger(__name__)

TNum = TypeVar('TNum', int, float)


[docs]class BinaryOperator:
[docs] @classmethod @abstractmethod def eval(cls, op1: Any, op2: Any) -> Any: pass
@classmethod def __str__(cls): return cls.__class__.__name__ @classmethod def __call__(cls, *args): cls.eval(*args)
[docs]class BinaryOperatorAnd(BinaryOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: return op1 and op2
[docs]class BinaryOperatorOr(BinaryOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: return op1 or op2
[docs]class BinarySameTypeOperator(BinaryOperator):
[docs] @classmethod @abstractmethod def eval(cls, op1: Any, op2: Any) -> bool: if type(op1) != type(op2): raise BadSemantics( "op1 '{}' and op2 '{}' need to have the same type, found {}, {}" .format(op1, op2, type(op1), type(op2))) return False
[docs]class BinaryStringOperator(BinaryOperator):
[docs] @classmethod def eval(cls, op1: str, op2: str) -> bool: if not isinstance(op1, str): raise BadSemantics("op1 '{}' is not a string".format(op1)) if not isinstance(op2, str): raise BadSemantics("op2 '{}' is not a string".format(op2)) return False
[docs]class BinaryNumeralOperator(BinaryOperator):
[docs] @classmethod def eval(cls, op1: TNum, op2: TNum) -> bool: NumberTypes = (int, float) if not isinstance(op1, NumberTypes): raise BadSemantics("op1 '{}' is not a number".format(op1)) if not isinstance(op2, NumberTypes): raise BadSemantics("op1 '{}' is not a number".format(op2)) return False
[docs]class BinaryOperatorIn(BinaryOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Union[list, dict]) -> bool: if not isinstance(op2, list): if isinstance(op2, dict): return op1 in op2.keys() raise BadSemantics("op2 '{}' is not a list".format(op2)) return op1 in op2
[docs]class Lesser(BinarySameTypeOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: super().eval(op1, op2) return op1 < op2
[docs]class Greater(BinarySameTypeOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: super().eval(op1, op2) return op1 > op2
[docs]class Equal(BinaryOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: return op1 == op2
[docs]class NotEqual(BinaryOperator):
[docs] @classmethod def eval(cls, op1: Any, op2: Any) -> bool: return op1 != op2
[docs]class startswith(BinaryStringOperator):
[docs] @classmethod def eval(cls, op1: str, op2: str) -> bool: super().eval(op1, op2) return op1.startswith(op2)
[docs]class matches(BinaryStringOperator):
[docs] @classmethod def eval(cls, op1: str, op2: str) -> bool: super().eval(op1, op2) LOGGER.debug("regex matching op1: '%s', op2: '%s'", op1, op2) LOGGER.debug("result is %s", re.fullmatch(op2, op1)) return re.fullmatch(op2, op1) is not None
binary_operators = { "startswith": startswith, "matches": matches, "in": BinaryOperatorIn, "<": Lesser, ">": Greater, "==": Equal, "!=": NotEqual }
[docs]class UOP:
[docs] @staticmethod def exists(elem: Any) -> bool: return elem is not None
[docs]class TransformAttr(MyTransformer): def __init__(self, data: Dict): super().__init__(self) self.data = data
[docs] def subject_attr(self, args: List) -> Any: LOGGER.debug("data is %s", self.data['subject']) LOGGER.debug("args are %s", str(args[0])) attr_str = str(args[0]) attr = reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, attr_str.split("."), self.data["subject"]) if attr is None: raise SubjectAttributeMissing("No subject_attr %s" % str(args[0]), args[0]) return attr
[docs] def access_attr(self, args: List) -> Any: attr = reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, args[0].split("."), self.data["access"]) # attr = self.data["access"].get(str(args[0]), None) return attr
[docs] def object_attr(self, args: List) -> Any: LOGGER.debug("data is %s", self.data['object']) LOGGER.debug("args are %s", args) attr_str = str(args[0]) attr = reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, attr_str.split("."), self.data["object"]) if attr is None: raise ObjectAttributeMissing("No object attr %s" % attr_str, args[0]) return attr
[docs] def environment_attr(self, args: List) -> Any: attr = reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, args[0].split("."), self.data["environment"]) if attr is None: raise EnvironmentAttributeMissing( "No object attr %s" % str(args[0]), args[0]) # warnings.warn("No environment_attr %s" % str(args[0]), # EnvironmentAttributeMissingWarning) return attr
[docs] def list_inner(self, args: List) -> Any: # either we have two children (one list, one literal) or one child (literal) if len(args) == 1: return [args[0]] return [args[0]] + args[1]
[docs] def lit(self, args: List) -> Union[Dict, List, str, int, float]: if isinstance(args[0], (list, )): return args[0] if args[0].type in ["SINGLE_QUOTED_STRING", "DOUBLE_QUOTED_STRING", "RAW_STRING"]: return str(literal_eval(args[0])) if args[0].type == "BOOL": return args[0] == "True" return int(args[0])
[docs]class ExistsTransformer(MyTransformer): """ The exists Transformer must run before the normal transformers in order to catch exceptions """ def __init__(self, attr_transformer: TransformAttr): super().__init__(self) self.attr_transformer = attr_transformer def _exists(self, args: List) -> bool: try: getattr(self.attr_transformer, args[0].data)(args[0].children) return True except AttributeMissing: return False
[docs] def single(self, args: List) -> Any: if args[0] == "exists": return self._exists(args[1:]) return Tree("single", args)
[docs] def uop(self, args: List) -> Any: if args[0] == "exists": return "exists" return Tree("uop", args)
#return getattr(UOP, str(args[0]))
[docs]class TopLevelTransformer(MyTransformer):
[docs] def condition(self, args: List) -> Any: if isinstance(args[0], Tree): return Tree("condition", args) if len(args) == 1: return bool(args[0]) raise ValueError
#return Tree("condition", args)
[docs] def target(self, args: List) -> Any: if isinstance(args[0], Tree): return Tree("target", args) if len(args) == 1: return bool(args[0]) raise ValueError
[docs] def statement(self, args: List) -> Any: if isinstance(args[0], Tree): return Tree("statement", args) if len(args) == 1: return bool(args[0]) raise ValueError
[docs]class OperatorTransformer(MyTransformer):
[docs] def cbop(self, args: List) -> Callable: LOGGER.debug("cbop got called") str_op = str(args[0]) op = binary_operators.get(str_op, None) if op is None: raise NotImplementedError() return op
[docs] def lbop(self, args: List) -> Callable: str_op = str(args[0]) if str_op == 'and': return BinaryOperatorAnd elif str_op == 'or': return BinaryOperatorOr else: raise NotImplementedError
[docs] def uop(self, args: List) -> Any: return getattr(UOP, str(args[0]))
[docs]class MiddleLevelTransformer(MyTransformer):
[docs] def comparison(self, args: List) -> bool: # xor check for none attributes if bool(args[0] is None) ^ bool(args[2] is None): return False # assert op is not None # for mypy LOGGER.debug("{} {} {}".format(args[0], args[1], args[2])) return args[1].eval(args[0], args[2])
[docs] def linked(self, args: List) -> bool: if isinstance(args[0], Tree) or isinstance(args[2], Tree): return Tree("linked", args) allowed_types = (bool, dict, list, str, float, int) if args[0] is None: args[0] = False assert issubclass(args[1], BinaryOperator) if isinstance(args[0], allowed_types) and isinstance( args[2], allowed_types): return args[1].eval(args[0], args[2]) LOGGER.debug("Types are %s and %s", type(args[0]), type(args[2])) raise ValueError
# return Tree("linked", args)
[docs] def single(self, args: List) -> Any: if len(args) == 2: return args[0](args[1]) if len(args) == 1: return args[0] raise ValueError
[docs]def parseable(lark_handle: Lark, rule: str) -> bool: try: lark_handle.parse(rule) return True except (lark.exceptions.UnexpectedCharacters, lark.exceptions.UnexpectedEOF): return False
[docs]def parse_and_transform(lark_handle: Lark, rule: str, data: Dict) -> bool: try: ast = lark_handle.parse(rule) except (lark.exceptions.UnexpectedCharacters, lark.exceptions.UnexpectedEOF): raise BadRuleSyntax('Rule has a bad syntax %s' % rule) # Eval exists attr_transformer = TransformAttr(data) new_ast = ExistsTransformer(attr_transformer).transform(ast) T = attr_transformer + OperatorTransformer() + MiddleLevelTransformer() + TopLevelTransformer() return T.transform(new_ast)
[docs]def check_condition(condition: str, data: Dict) -> bool: global lark_condition LOGGER.debug("Check condition %s with data %s", condition, data) ret_value = parse_and_transform(lark_condition, condition, data) LOGGER.debug("Condition %s evaluated to %s", condition, ret_value) return ret_value
[docs]def check_target(rule: str, data: Dict) -> bool: global lark_target LOGGER.debug("Check target rule %s with data %s", rule, data) try: ret_value = parse_and_transform(lark_target, rule, data) except Exception as e: LOGGER.debug(e) raise LOGGER.debug("Target Rule %s evaluated to %s", rule, ret_value) return ret_value
if __name__ == "__main__": # l = Lark(grammar, start="condition") data = {"subject": {"email": "hello"}} attr_transformer = TransformAttr(data) ast = l.parse("exists subject.email") new_ast = ExistsTransformer(attr_transformer).transform(ast) print(new_ast) T = attr_transformer * OperatorTransformer() * MiddleLevelTransformer() * TopLevelTransformer() print(T.transform(new_ast)) ast = l.parse("exists subject.notexisting") new_ast = ExistsTransformer(attr_transformer).transform(ast) print(new_ast) T = attr_transformer * OperatorTransformer() * MiddleLevelTransformer() * TopLevelTransformer() print(T.transform(new_ast)) print(ast) new_ast = ExistsTransformer(attr_transformer).transform(ast) print(new_ast) #print(new_ast) # # ast = l.parse("[5, '4', True]") # print(ast) # #data = {} # ast = TransformAttr(data).transform(ast) # tree.pydot__tree_to_png(ast, "graph.png") # ast = l.parse("subject.email != object.email") # #print(ast) # # data = { # "subject": { # "email": "blub" # }, # "object": { # "email": "blab" # }, # "environment": { # "time": 2 # } # } # transformed = TransformAttr(data).transform(ast) # transformed = EvalTree().transform(transformed) # tree.pydot__tree_to_png(ast, "graph.png") # tree.pydot__tree_to_png(transformed, "graph02.png") # ast = l.parse("exists environment.time") # tree.pydot__tree_to_png(ast, "graph01.png") # T = TransformAttr(data) * EvalTree() * EvalComplete() # t1 = TransformAttr(data).transform(ast) # t2 = EvalTree().transform(t1) # t3 = EvalComplete().transform(t2) # print(T.transform(ast)) # print(t3) # ast = l.parse("True") # tree.pydot__tree_to_png(ast, "graph04.png") # ast = l.parse("environment.time < 3") # tree.pydot__tree_to_png(ast, "graph03_orig.png") # ast = TransformAttr(data).transform(ast) # tree.pydot__tree_to_png(ast, "graph03_attr.png") # ast = EvalTree().transform(ast) # tree.pydot__tree_to_png(ast, "graph03.png")