Source code for arpoc.pap

"""
Module for the Policy Administration Point (special page)

The PAP allows you to display the currently loaded access control entities.
It also allows to test specific rules and visualizes the result of the evaluation.


"""

from dataclasses import dataclass, field
from typing import List, Tuple, Union, Optional, Dict

import ast

from collections.abc import Mapping

import os

import itertools

from jinja2 import Environment, FileSystemLoader

from arpoc.base import ServiceProxy, ObjectDict, EnvironmentDict
import arpoc.ac as ac

env = Environment(loader=FileSystemLoader(
    os.path.join(os.path.dirname(__file__), 'resources', 'templates')))


[docs]@dataclass class PAPNode: ID: str node_type: str resolver: str target: str effect: str condition: str policy_sets: Optional[List['PAPNode']] policies: Optional[List['PAPNode']] rules: Optional[List['PAPNode']]
[docs]def create_PAPNode_Rule(rule: ac.Rule) -> PAPNode: return PAPNode(rule.entity_id, "rule", "", rule.target, str(rule.effect), rule.condition, None, None, None)
[docs]def create_PAPNode_Policy(policy: ac.Policy) -> PAPNode: rules = [create_PAPNode_Rule(ac.container.rules[x]) for x in policy.rules] return PAPNode(policy.entity_id, "policy", policy.conflict_resolution, policy.target, "", "", None, None, rules)
[docs]def create_PAPNode_Policy_Set(policy_set: ac.Policy_Set) -> PAPNode: policies = [ create_PAPNode_Policy(ac.container.policies[x]) for x in policy_set.policies ] policy_sets = [ create_PAPNode_Policy_Set(ac.container.policy_sets[x]) for x in policy_set.policy_sets ] return PAPNode(policy_set.entity_id, "policy set", policy_set.conflict_resolution, policy_set.target, "", "", policy_sets, policies, None)
[docs]class PolicyAdministrationPoint(ServiceProxy): # def __init__(self): # pass def _proxy(self, url: str, access: Dict) -> str: context = {} if url.startswith('pap/testbed'): services = self._oidc_handler.cfg.services.keys() s = [] result: Optional[ac.EvaluationResult] = None obj_setters = False env_setters = False try: # access.query_dict entity_id, sub, obj, acc, env_attr, service = ( access['query_dict'][x].strip() for x in ['entity', 'subject', 'object', 'access', 'environment', 'service']) if "object_setters" in access['query_dict']: obj_setters = True if "environment_setters" in access['query_dict']: env_setters = True except KeyError: pass else: if service not in services: return "" entity_obj: ac.AC_Entity # get the entity_id object if entity_id in ac.container.policy_sets: typ = "policy_set" entity_obj = ac.container.policy_sets[entity_id] s.append(create_PAPNode_Policy_Set(entity_obj)) elif entity_id in ac.container.policies: typ = "policy" entity_obj = ac.container.policies[entity_id] s.append(create_PAPNode_Policy(entity_obj)) elif entity_id in ac.container.rules: typ = "rule" entity_obj = ac.container.rules[entity_id] s.append(create_PAPNode_Rule(entity_obj)) else: return "" try: sub_dict = ast.literal_eval(sub) except SyntaxError: sub_dict = {} try: obj_dict = ast.literal_eval(obj) except SyntaxError: obj_dict = {} try: env_dict = ast.literal_eval(env_attr) except SyntaxError: env_dict = {} try: acc_dict = ast.literal_eval(acc) except SyntaxError: acc_dict = {} obj_dict['service'] = service if obj_setters: obj_dict = ObjectDict(self._oidc_handler.cfg.services[service].objectsetters) if env_setters: env_dict = EnvironmentDict() context = { 'subject': sub_dict, 'object': obj_dict, 'environment': env_dict, 'access': acc_dict } result = entity_obj.evaluate(context) entity_ids = itertools.chain(ac.container.policy_sets.keys(), ac.container.policies.keys(), ac.container.rules.keys()) tmpl = env.get_template('testbed.html') return tmpl.render(pap_nodes=s, entity_ids=entity_ids, result=result, services=services, **context) tmpl = env.get_template('pap.html') s = [] for ps in ac.container.policy_sets: s.append(create_PAPNode_Policy_Set(ac.container.policy_sets[ps])) #url.startswith('pap/view') return tmpl.render(pap_nodes=s)