Source code for arpoc.plugins.obl_loggers

import re
from collections.abc import Mapping
from functools import reduce
from copy import deepcopy
from typing import Dict
import logging

from arpoc.plugins._lib import Obligation, Optional, deep_dict_update
from arpoc.ac.common import Effects



"""
Obligation Log Module.
The classes here will either
  - log every access (Log),
  - every granted access (LogSuccessful)
  - every denied access (LogFailed)

The log can be configured via a dict cfg:
    cfg['loggercfg'] -- the logger cfg of the python logging module
    cfg['formatstring'] -- A format string for the message generation
The log will be created with INFO level
"""

logger_cfg = {
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "obligation_file": {
            "class": "logging.handlers.RotatingFileHandler",
            "filename": "obligation.log",
            "maxBytes": 1024,
            "backupCount": 3,
            "level": "INFO",
        }
    },
    "loggers": {
        "obligation_logger": {
            "level": "INFO",
            "handlers": ["obligation_file"]
        }
    }
}


[docs]class Log(Obligation): """ Log the access request. Name: obl_log Configuration: - loggercfg -- the logger cfg of the python logging module - formatstring -- A format string for the message generation default: `{} subject.email accessed object.service [object.path] -- object.target_url` The log will be created with INFO level """ name = "obl_log"
[docs] @staticmethod def replace_subjectattr(logtext, subject_info): regex = r"subject\.(?P<subject>[.\w]+)" func = lambda x: reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, x.group('subject').split("."), subject_info) return re.sub(regex, func, logtext)
[docs] @staticmethod def replace_objectattr(logtext, object_info): regex = r"object\.(?P<object>[.\w]+)" func = lambda x: reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, x.group('object').split("."), object_info) return re.sub(regex, func, logtext)
[docs] @staticmethod def replace_envattr(logtext, env_info): regex = r"environment\.(?P<env>[.\w]+)" func = lambda x: reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, x.group('env').split("."), env_info) return re.sub(regex, func, logtext)
[docs] @staticmethod def replace_accessattr(logtext, access_info): regex = r"access\.(?P<access>[.\w]+)" func = lambda x: reduce( lambda d, key: d.get(key, None) if isinstance(d, Mapping) else None, x.group('access').split("."), access_info) return re.sub(regex, func, logtext)
[docs] @staticmethod def replace_attr(logtext, context): logtext = Log.replace_subjectattr(logtext, context['subject']) logtext = Log.replace_objectattr(logtext, context['object']) logtext = Log.replace_envattr(logtext, context['environment']) logtext = Log.replace_accessattr(logtext, context['access']) return logtext
[docs] @staticmethod def run(effect: Optional[Effects], context: Dict, cfg: Dict) -> bool: if 'logger_cfg' in cfg: copy_logger_cfg = deepcopy(logger_cfg) merged_cfg = deep_dict_update(copy_logger_cfg, cfg['logger_cfg']) else: merged_cfg = deepcopy(logger_cfg) logger = logging.getLogger("obligation_logger") logging.config.dictConfig(merged_cfg) if 'log_format' in cfg: log_format = cfg['log_format'] else: log_format = ("{} subject.email accessed object.service " "[object.path] -- object.target_url") log_format = log_format.format(str(effect)) logger.info(Log.replace_attr(log_format, context)) return True
[docs]class LogFailed(Obligation): """ Log failed access requests. Name: obl_log_failed Configuration: Same as obl_log """ name = "obl_log_failed"
[docs] @staticmethod def run(effect: Optional[Effects], context: Dict, cfg: Dict) -> bool: if effect == Effects.DENY: Log.run(effect, context, cfg) return True
[docs]class LogSuccessful(Obligation): """ Log successful access requests. Name: obl_log_successful Configuration: Same as obl_log """ name = "obl_log_successful"
[docs] @staticmethod def run(effect: Optional[Effects], context: Dict, cfg: Dict) -> bool: if effect == Effects.GRANT: Log.run(effect, context, cfg) return True