Source code for arpoc.config

""" Configuration Module of ARPOC

After importing this file you have access to
the configuration with the `config.cfg` variable.
"""

import inspect
import logging
import os
from dataclasses import InitVar, asdict, dataclass, field, replace
from typing import Any, Dict, List, Optional

import yaml

from arpoc.exceptions import ConfigError

LOGGING = logging.getLogger()


[docs]@dataclass class ProviderConfig: """ Configuration for a single Open ID Connect Provider Attributes: - **human_readable_name**: A name which arpoc uses when communicating with the user / operator - **configuration_url**: The base url of the OIDC provider. Without the .well-known/ part - **configuration_token**: The token ARPOC can use to register itself with the OIDC provider - **registration_token**: The token issued from the OIDC provider for a specific client to obtain its configuration - **registration_url**: The url where arpoc can obtain its configuration after registration. - **method**: Either 'auto', 'GET', or 'POST'. The HTTP method ARPOC will use if the OIDC / OAuth standard gives a choice. - **special_claim2scope**: A mapping from claim to scopes that will deliver the claims. **Mandatory arguments**: - configuration_url And either: - configuration_token Or: - registration_token - registration_url """ baseuri: InitVar[str] """ arpoc's base uri :meta private: """ human_readable_name: str configuration_url: str = "" configuration_token: str = "" registration_token: str = "" registration_url: str = "" method: str = "auto" special_claim2scope: InitVar[dict] = None claim2scope: dict = field(init=False) redirect_paths: List[str] = field(default_factory=list) do_token_introspection: bool = True def __post_init__(self, baseuri: str, special_claim2scope: Dict) -> None: self.claim2scope = { "name": ['profile'], "family_name": ['profile'], "given_name": ['profile'], "middle_name": ['profile'], "nickname": ['profile'], "preferred_username": ['profile'], "profile": ['profile'], "picture": ['profile'], "website": ['profile'], "gender": ['profile'], "birthdate": ['profile'], "zoneinfo": ['profile'], "locale": ['profile'], "updated_at": ['profile'], "email": ["email"], "email_verified": ["email"], "address": ["address"], "phone": ["phone"], "phone_number_verified": ["phone"] } if special_claim2scope: for key, val in special_claim2scope.items(): self.claim2scope[key] = val self.redirect_uris = [] for redirect_path in self.redirect_paths: self.redirect_uris.append("{}{}".format(baseuri, redirect_path))
[docs] def check_method(self): if self.method not in ["auto", "POST", "GET"]: raise ConfigError(f"Method of Provider {self.human_readable_name} is not valid; must be auto, POST or GET")
def __getitem__(self, key: str) -> Any: return getattr(self, key)
[docs]def default_redirect() -> List: """ Default Redirect Path""" return ["/secure/redirect_uris"]
[docs]def default_json_dir() -> List: """ Default json path for access control entities """ return ["/etc/arpoc/acl"]
#pylint: disable=too-many-instance-attributes
[docs]@dataclass class ProxyConfig: """ Configuration for the Proxy Setup Attributes: - **keyfile**: The path to the private key file of the TLS keypair - **certfile**: The path to the certificate chain file (full chain) - **domainname**: The domain name where ARPOC will be available - **contacts**: A list of mail contact adresses responsible for the ARPOC instance Mandatory: **keyfile**, **certfile**, **domainname**, **contacts** """ keyfile: str certfile: str domainname: str contacts: List[str] address: str = "0.0.0.0" tls_port: int = 443 plain_port: int = 80 https_only: bool = True username: str = "www-data" groupname: str = "www-data" secrets: str = "/var/lib/arpoc/secrets.yml" tls_redirect: str = "/TLSRedirect" auth: str = "/auth" redirect: List[str] = field(default_factory=default_redirect) def __getitem__(self, key: str) -> Any: return getattr(self, key) def __post_init__(self) -> None: assert isinstance(self.redirect, List) self.baseuri = "https://{}/".format(self.domainname) self.redirect_uris = [] for redirect_path in self.redirect: # skip first slash if redirect_path has one rp = redirect_path[1:] if redirect_path.startswith('/') else redirect_path self.redirect_uris.append(f"{self.baseuri}{rp}")
[docs]@dataclass class ServiceConfig: """ Configuration for a single proxied Service Attributes: - **origin_URL**: The URL that will be proxied, or the special page string; see :ref:`Special Pages <specialpagessection>` - **proxy_URL**: The *path* under which *origin_URL* will be available. - **AC**: The policy set which is evaluated to decide the access request - **objectsetters**: Configuration for the objectsetters - **obligations**: Configuration for obligations - **authentication** Authentication information which will be used to request *origin_URL* Mandatory Arguments: - *origin_URL* - *proxy_URL* - *AC* """ origin_URL: str proxy_URL: str AC: str objectsetters: dict = field(default_factory=dict) obligations: dict = field(default_factory=dict) authentication: dict = field(default_factory=dict) def __getitem__(self, key: str) -> Any: return getattr(self, key)
[docs]@dataclass class ACConfig: """ Configuration for the access control Attributes: - **json_dir**: The directory where the AC Entities are stored. The files must end with ".json" """ json_dir: List[str] = field(default_factory=default_json_dir) def __getitem__(self, key: str) -> Any: return getattr(self, key)
[docs]@dataclass class Misc: """ Misc Config Class Attributes: - **access_log**: The location to store the access log (HTTP requests) - **error_log**: The location to store the error_log - **daemonize**: If arpoc should start daemonized. - **log_level**: ARPOC's log level. (DEBUG/INFO/ERROR/WARN). Affects also underlying libraries - **pid_file**: Where ARPOC should store the process id file. Only used when daemonized. - **plugin_dirs**: Where ARPOC should load plugins No mandatory arguments. """ pid_file: str = "/var/run/arpoc.pid" daemonize: bool = True log_level: str = "INFO" access_log: str = "/var/log/arpoc/access.log" error_log: str = "/var/log/arpoc/error.log" plugin_dirs: List[str] = field(default_factory=list)
[docs]class OIDCProxyConfig: """ Config Container Which for all specific configuration """ def __init__(self, config_file: Optional[str] = None, std_config: Optional[str] = '/etc/arpoc/config.yml'): self.openid_providers: Dict[str, ProviderConfig] = {} self.proxy: ProxyConfig = ProxyConfig("", "", "", [""]) self.services: Dict[str, ServiceConfig] = {} self.access_control = ACConfig() self.misc: Misc = Misc() default_paths = [std_config] if 'OIDC_PROXY_CONFIG' in os.environ: default_paths.append(os.environ['OIDC_PROXY_CONFIG']) if config_file: default_paths.append(config_file) for filepath in default_paths: if filepath: try: self.read_file(filepath) except IOError: pass self.check_config()
[docs] def add_provider(self, name: str, prov_cfg: ProviderConfig) -> None: """ Adds the provider with key <name> to the configuration """ self.openid_providers[name] = prov_cfg
[docs] def check_redirect_uri(self) -> None: """ Checks if every redirect uri in the provider config is also in the proxy list """ for _, provider_obj in self.openid_providers.items(): for redirect_url in provider_obj.redirect_uris: if redirect_url not in self.proxy.redirect: raise ConfigError(f"{provider_obj.human_readable_name} has an invalid redirect_path")
[docs] def check_config_proxy_url(self) -> None: """ Checks for duplicates in the proxy_url """ proxy_urls: List[str] = [] for key, service in self.services.items(): if service.proxy_URL in proxy_urls: raise ConfigError("Bound different services to the same URL") proxy_urls.append(service.proxy_URL) assert self.proxy is not None
[docs] def check_config(self) -> None: """ Make consistency checks for the arpoc config """ LOGGING.debug("checking config consistency") for provider in self.openid_providers.values(): attrs = (getattr(provider, name) for name in dir(provider) if name.startswith("check_")) methods = filter(inspect.ismethod, attrs) for method in methods: method() attrs = (getattr(self, name) for name in dir(self) if name.startswith("check_") and name != "check_config") methods = filter(inspect.ismethod, attrs) for method in methods: method()
[docs] def merge_config(self, new_cfg: Dict) -> None: """Merges the current configuration with a new configuration dict """ if 'proxy' in new_cfg: if self.proxy: self.proxy = replace(self.proxy, **new_cfg['proxy']) else: self.proxy = ProxyConfig(**new_cfg['proxy']) if 'services' in new_cfg: for key, val in new_cfg['services'].items(): service_cfg = ServiceConfig(**val) self.services[key] = service_cfg if 'openid_providers' in new_cfg: for key, val in new_cfg['openid_providers'].items(): provider_cfg = ProviderConfig(self.proxy.baseuri, **val) self.openid_providers[key] = provider_cfg if 'access_control' in new_cfg: self.access_control = ACConfig(**new_cfg['access_control']) if 'misc' in new_cfg: if self.misc: self.misc = replace(self.misc, **new_cfg['misc']) else: self.misc = Misc(**new_cfg['misc'])
[docs] def read_file(self, filepath: str) -> None: """ Read the YAML file <filepath> and add the contents to the current configuration """ with open(filepath, 'r') as ymlfile: new_cfg = yaml.safe_load(ymlfile) self.merge_config(new_cfg)
[docs] def print_config(self) -> None: """ Print the current config """ cfg: Dict[str, Dict] = dict() cfg['services'] = dict() cfg['openid_providers'] = dict() for services_key, services_obj in self.services.items(): cfg['services'][services_key] = asdict(services_obj) for providers_key, providers_obj in self.openid_providers.items(): cfg['openid_providers'][providers_key] = asdict(providers_obj) cfg['proxy'] = asdict(self.proxy) cfg['access_control'] = asdict(self.access_control) print(yaml.dump(cfg, sort_keys=False))
[docs] @staticmethod def print_sample_config() -> None: """ Prints a sample config """ provider = ProviderConfig("", "", "", "", "", "") proxy = ProxyConfig("", "", "", [""], "") service = ServiceConfig("", "", "", {}, {}) ac_config = ACConfig() misc = Misc() # delete the default values of claim2scope provider_dict = asdict(provider) del provider_dict['claim2scope'] del provider_dict['do_token_introspection'] cfg = { "openid_providers": { "example": provider_dict }, "proxy": asdict(proxy), "services": { "example": asdict(service) }, "access_control": asdict(ac_config), "misc": asdict(misc) } print(yaml.dump(cfg, sort_keys=False))
cfg: Optional[OIDCProxyConfig] = None if __name__ == "__main__": cfg = OIDCProxyConfig() cfg.print_sample_config()