Source code for arpoc.base

"""
The two base classes OidcHandler and ServiceProxy
Also the TLSOnlyDispatcher to  force https
"""

# Python imports
import logging
import logging.config
import copy


import os

import hashlib

import urllib.parse

#from http.client import HTTPConnection
#HTTPConnection.debuglevel = 1
from typing import List, Dict, Union, Tuple, Callable, Iterable, Optional, Any

# side packages

##oic
import oic.oic
from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from oic.oic.message import RegistrationResponse, AuthorizationResponse

from oic import rndstr
#from oic.utils.http_util import Redirect
import oic.extension.client

import oic.exception

import requests

import cherrypy
from cherrypy._cpdispatch import Dispatcher
#from cherrypy.process.plugins import DropPrivileges, Daemonizer, PIDFile

from jinja2 import Environment, FileSystemLoader

from jwkest import jwt, BadSyntax

#### Own Imports

import arpoc.ac as ac
import arpoc.exceptions
import arpoc.config as config
import arpoc.cache
import arpoc.utils
from arpoc.plugins import EnvironmentDict, ObjectDict, ObligationsDict

#logging.basicConfig(level=logging.DEBUG)

LOGGING = logging.getLogger(__name__)

env = Environment(loader=FileSystemLoader(
    os.path.join(os.path.dirname(__file__), 'resources', 'templates')))

[docs]class OidcHandler: """ A class to handle the connection to OpenID Connect Providers """ def __init__(self, cfg: config.OIDCProxyConfig): self.__oidc_provider: Dict[str, oic.oic.Client] = dict() self.cfg = cfg self._secrets: Dict[str, dict] = dict() self._cache = arpoc.cache.Cache() # assert self.cfg.proxy is not None
[docs] def get_secrets(self) -> Dict[str, dict]: """ Returns the secrets (client_id, client_secret) of the OIDC Relying Partys""" return self._secrets
[docs] def set_secrets(self, secrets): """ Set the secrets dict """ self._secrets = secrets
[docs] def register_first_time(self, name: str, provider: config.ProviderConfig) -> None: """ Registers a client or reads the configuration from the registration endpoint If registration_url is present in the configuration file, then it will try to read the configuration using the registration_token. If configuration_url is present in the configuration file, it will try to set the configuration using the registration endpoint dynamically received with the well-known location url (configuration_url) """ client = oic.oic.Client(client_authn_method=CLIENT_AUTHN_METHOD) registration_response: RegistrationResponse try: if provider.registration_url and provider.registration_token: provider_info = client.provider_config( provider['configuration_url']) # Only read configuration registration_response = client.registration_read( url=provider['registration_url'], registration_access_token=provider['registration_token']) args = dict() args['redirect_uris'] = registration_response['redirect_uris'] elif provider.configuration_url and provider.configuration_token: assert self.cfg.proxy is not None provider_info = client.provider_config( provider['configuration_url']) redirect_uris = (provider.redirect_uris if provider.redirect_uris else self.cfg.proxy['redirect_uris']) args = { "redirect_uris": redirect_uris, "contacts": self.cfg.proxy['contacts'] } registration_response = client.register( provider_info["registration_endpoint"], registration_token=provider['configuration_token'], **args) else: raise arpoc.exceptions.OIDCProxyException( "Error in the configuration file") except oic.exception.RegistrationError: LOGGING.warning("Provider %s returned an error on registration", name) LOGGING.debug("Seems to be permament, so not retrying") return except (requests.exceptions.MissingSchema, requests.exceptions.InvalidSchema, requests.exceptions.InvalidURL): raise arpoc.exceptions.OIDCProxyException( "Error in the configuration file") self.__oidc_provider[name] = client self.__oidc_provider[name].redirect_uris = args["redirect_uris"] self._secrets[name] = registration_response.to_dict()
[docs] def create_client_from_secrets(self, name: str, provider: config.ProviderConfig) -> None: """ Try to create an openid connect client from the secrets that are saved in the secrets file""" client_secrets = self._secrets[name] client = oic.oic.Client(client_authn_method=CLIENT_AUTHN_METHOD) client.provider_config(provider.configuration_url) client_reg = RegistrationResponse(**client_secrets) client.store_registration_info(client_reg) client.redirect_uris = client_secrets[ 'redirect_uris'] self.__oidc_provider[name] = client self._secrets[name] = client_reg.to_dict()
[docs] def get_userinfo_access_token(self, access_token: str) -> Tuple[int, Dict]: """ Get the user info if the user supplied an access token""" # TODO: error handling (no jwt) # TODO: allow issuer parameter in header here userinfo = {} LOGGING.debug(access_token) try: access_token_obj = jwt.JWT() access_token_obj.unpack(access_token) LOGGING.debug(access_token_obj.payload()) issuer = access_token_obj.payload()['iss'] except BadSyntax: LOGGING.debug("Decoding Access Token failed") if 'x-arpoc-issuer' in cherrypy.request.headers: LOGGING.debug("issuer hint found") issuer = cherrypy.request.headers['x-arpoc-issuer'] else: raise Exception("400 - Bad Request") # TODO # check if issuer is in provider list client = None for provider_name, obj in self.__oidc_provider.items(): LOGGING.debug(obj) if obj.issuer == issuer: client = obj client_name = provider_name valid_until = 0 if client: if self.cfg.openid_providers[client_name].do_token_introspection: # do userinfo with provided AT # we need here the oauth extension client args = ["client_id", "client_authn_method", "keyjar", "config"] kwargs = {x: client.__getattribute__(x) for x in args} oauth_client = oic.extension.client.Client(**kwargs) for key, val in client.__dict__.items(): if key.endswith("_endpoint"): oauth_client.__setattr__(key, val) oauth_client.client_secret = client.client_secret introspection_kwargs = {'authn_method' : 'client_secret_basic'} if self.cfg.openid_providers[client_name].method != "auto": introspection_kwargs['method'] = self.cfg.openid_providers[client_name].method introspection_res = oauth_client.do_token_introspection( request_args={ 'token': access_token, 'state': rndstr() }, **introspection_kwargs) if introspection_res['active']: if 'exp' in introspection_res: valid_until = introspection_res['exp'] else: valid_until = arpoc.utils.now() + 30 else: valid_until = arpoc.utils.now() + 30 userinfo_kwargs = {'token' : access_token} if self.cfg.openid_providers[client_name].method != "auto": userinfo_kwargs['method'] = self.cfg.openid_providers[client_name].method userinfo = client.do_user_info_request(**userinfo_kwargs) else: LOGGING.info( "Access token received, but no suitable provider in configuration" ) LOGGING.info("Access token issuer %s", issuer) return valid_until, dict(userinfo)
@staticmethod def _check_session_refresh() -> bool: """ checks if the session must be refreshed. If there is no session, then False is returned""" if 'refresh' in cherrypy.session: now = arpoc.utils.now() LOGGING.debug("refresh necessary: %s, now: %s", cherrypy.session['refresh'], now) return cherrypy.session['refresh'] < now return False
[docs] def need_claims(self, claims: List[str]) -> None: """ Maps claims to scopes and checks if the scopes were already requested. Else start auth procedure to get requested scopes""" cherrypy.session["url"] = cherrypy.url() if 'provider' in cherrypy.session: provider = cherrypy.session['provider'] scopes = set(["openid"]) for claim in claims: LOGGING.debug("Need claim %s", claim) scopes |= set( self.cfg.openid_providers[provider].claim2scope[claim]) LOGGING.debug("Need scopes %s", scopes) self._auth(scopes) else: raise cherrypy.HTTPRedirect(self.cfg.proxy.auth)
[docs] @staticmethod def get_access_token_from_headers() -> Union[None, str]: """ Returns the Access Token from the authorization header. Strips the bearer part """ if 'authorization' in cherrypy.request.headers: auth_header = cherrypy.request.headers['authorization'] len_bearer = len("bearer") if len(auth_header) > len_bearer: auth_header_start = auth_header[0:len_bearer] if auth_header_start.lower() == 'bearer': access_token = auth_header[len_bearer + 1:] return access_token return None
[docs] def refresh_access_token(self, hash_access_token: str) -> Tuple[str, Dict]: """ Refreshes the access token. This can only be done, if we are Client (normal web interface). """ client_name = cherrypy.session['provider'] client = self._get_oidc_client(client_name) cache_entry = self._cache[hash_access_token] state = cache_entry['state'] try: del self._cache[hash_access_token] userinfo_kwargs = {'state' : state} if self.cfg.openid_providers[client_name].method != "auto": userinfo_kwargs['method'] = self.cfg.openid_providers[client_name].method userinfo = dict(client.do_user_info_request(**userinfo_kwargs)) new_token = client.get_token(state=state) LOGGING.debug("New token: %s", new_token) hash_access_token = hashlib.sha256( str(new_token.access_token).encode()).hexdigest() cherrypy.session['hash_at'] = hash_access_token valid_until, refresh_valid = self.get_validity_from_token( new_token) self._cache.put( hash_access_token, { "state": state, "valid_until": valid_until, "userinfo": userinfo, "scopes": new_token.scope }, refresh_valid) return hash_access_token, userinfo except Exception as excep: LOGGING.debug(excep.__class__) raise
[docs] def get_userinfo(self) -> Tuple[Optional[str], Dict]: """ Gets the userinfo from the OIDC Provider. This works in two steps: 1. Check if the user supplied an Access Token 2. Otherwise, check the session management if the user is logged in """ access_token_header = self.get_access_token_from_headers() if access_token_header: hash_access_token = hashlib.sha256( access_token_header.encode()).hexdigest() try: return hash_access_token, self._cache[hash_access_token][ 'userinfo'] except KeyError: pass # how long is the token valid? valid_until, userinfo = self.get_userinfo_access_token( access_token_header) self._cache.put(hash_access_token, {"userinfo": userinfo}, valid_until) return hash_access_token, userinfo # check if refresh is needed if 'hash_at' in cherrypy.session: hash_access_token = cherrypy.session['hash_at'] now = arpoc.utils.now() # is the access token still valid? try: cache_entry = self._cache[hash_access_token] except KeyError: # hash_at is not in cache! LOGGING.debug('Hash at not in cache!') LOGGING.debug("Cache %s", self._cache.keys()) return (None, {}) # the entry valid_until is the validity of the refresh token, not of the cache entry if cache_entry['valid_until'] > now: return hash_access_token, cache_entry['userinfo'] return self.refresh_access_token(hash_access_token) return None, {}
def _get_oidc_client(self, name: str) -> oic.oic.Client: return self.__oidc_provider[name]
[docs] @staticmethod def get_validity_from_token(token: oic.oic.Token) -> Tuple[int, int]: """Find the validity of the id_token, access_token and refresh_token """ # how long is the information valid? # oauth has the expires_in (but only RECOMMENDED) # oidc has exp and iat required. # so if: iat + expires_in < exp -> weird stuff (but standard compliant) (iat, exp) = (token.id_token['iat'], token.id_token['exp']) at_exp = exp try: at_exp = token.expires_in + iat except AttributeError: pass valid_until = min(at_exp, exp) refresh_valid = valid_until try: refresh_valid = arpoc.utils.now() + token.refresh_expires_in except AttributeError: try: if token.refresh_token: refresh_valid = valid_until # TODO: add token introspection for the refresh_token (if jwt) except AttributeError: # we don't have a refresh token pass return (valid_until, refresh_valid)
[docs] def do_userinfo_request_with_state(self, state: str) -> Dict: """ Perform the userinfo request with given state """ client_name = cherrypy.session['provider'] client = self._get_oidc_client(client_name) try: userinfo_kwargs = {'state' : state} if self.cfg.openid_providers[client_name].method != "auto": userinfo_kwargs['method'] = self.cfg.openid_providers[client_name].method userinfo = client.do_user_info_request(**userinfo_kwargs) except oic.exception.CommunicationError as excep: exception_args = excep.args LOGGING.debug(exception_args) if exception_args[ 0] == "Server responded with HTTP Error Code 405": # allowed methods in [1] if exception_args[1][0] in ["GET", "POST"]: userinfo = client.do_user_info_request( state=state, method=exception_args[1][0]) else: raise except oic.exception.RequestError as excep: LOGGING.debug(excep.args) raise return userinfo
[docs] def get_access_token_from_code(self, state: str, code: str) -> oic.oic.Token: """ Takes the OIDC Authorization Code, Performs the Access Token Request Returns: The Access Token Request Response""" # Get Access Token qry = {'state': state, 'code': code} client_name = cherrypy.session['provider'] client = self._get_oidc_client(client_name) aresp = client.parse_response(AuthorizationResponse, info=qry, sformat="dict") if state != aresp['state']: raise RuntimeError LOGGING.debug("Authorization Response %s", dict(aresp)) # just code and state args = {"code": aresp["code"]} at_request_kwargs = { 'state' : aresp['state'], 'authn_method' : 'client_secret_basic' } # this is forbidden in the oauth standard (3.2) #if self.cfg.openid_providers[client_name].method != "auto": # at_request_kwargs['method'] = self.cfg.openid_providers[client_name].method resp = client.do_access_token_request( request_args=args, **at_request_kwargs) LOGGING.debug("Access Token Request %s", resp) token = client.get_token(state=aresp["state"]) assert isinstance(token, oic.oic.Token) return token
[docs] @staticmethod def check_scopes(request: List, response: List) -> Optional[str]: """ Checks the request and response scopes and alert if the response scopes are not enough""" requested_scopes = set(request) response_scopes = set(response) # Did we get the requested scopes? if not requested_scopes.issubset(response_scopes): tmpl = env.get_template('500.html') info = { "error": "The openid provider did not respond with the requested scopes", "requested scopes": request, "scopes in answer": response } return tmpl.render(info=info) return None
[docs] def redirect(self, **kwargs: Any) -> str: """Handler for the redirect method (entrypoint after forwarding to OIDC Provider """ # We are trying to get the user info here from the provider LOGGING.debug(cherrypy.session) LOGGING.debug('kwargs is %s', kwargs) # Errors? if 'error' in kwargs: tmpl = env.get_template('500.html') return tmpl.render(info=kwargs) # TODO: Here we should check that state has not been altered! token = self.get_access_token_from_code(kwargs['state'], kwargs['code']) hash_at = hashlib.sha256(str(token).encode()).hexdigest() cherrypy.session['hash_at'] = hash_at # check for scopes: response_check = self.check_scopes(cherrypy.session["scopes"], token.scope) if response_check: return response_check cherrypy.session["scopes"] = token.scope valid_until, refresh_valid = self.get_validity_from_token(token) userinfo = self.do_userinfo_request_with_state(state=kwargs["state"]) self._cache.put( hash_at, { "state": kwargs['state'], "valid_until": valid_until, "userinfo": dict(userinfo), "scopes": token.scope }, refresh_valid) # There should be an url in the session so we can redirect if "url" in cherrypy.session: raise cherrypy.HTTPRedirect(cherrypy.session["url"]) raise RuntimeError
def _auth(self, scopes: Optional[Iterable[str]] = None) -> None: if not scopes: scopes = ["openid"] if "hash_at" in cherrypy.session: hash_at = cherrypy.session["hash_at"] try: scopes_set = set(scopes) scopes_set_session = set(self._cache[hash_at]["scopes"]) if scopes_set.issubset(scopes_set_session): return None except KeyError: pass if "state" in cherrypy.session: LOGGING.debug("state is already present") cherrypy.session["state"] = rndstr() cherrypy.session["nonce"] = rndstr() # we need to test the scopes later cherrypy.session["scopes"] = list(scopes) client = self._get_oidc_client(cherrypy.session['provider']) args = { "client_id": client.client_id, "response_type": "code", "scope": cherrypy.session["scopes"], "nonce": cherrypy.session["nonce"], "redirect_uri": client.redirect_uris[0], "state": cherrypy.session['state'] } auth_req = client.construct_AuthorizationRequest(request_args=args) login_url = auth_req.request(client.authorization_endpoint) raise cherrypy.HTTPRedirect(login_url)
[docs] def auth(self, **kwargs: Any) -> Optional[str]: """ Start an authentication request. Redirects to OIDC Provider if given""" # Do we have only one openid provider? -> use this if len(self.__oidc_provider) == 1: cherrypy.session['provider'] = self.__oidc_provider.keys( ).__iter__().__next__() else: if 'name' in kwargs and kwargs['name'] in self.__oidc_provider: cherrypy.session['provider'] = kwargs['name'] else: LOGGING.debug(self.__oidc_provider) tmpl = env.get_template('auth.html') provider = dict() for key in self.__oidc_provider: provider[key] = self.cfg.openid_providers[key][ 'human_readable_name'] return tmpl.render(auth_page=self.cfg.proxy.auth, provider=provider) self._auth() return None # we won't get here
[docs]class ServiceProxy: """ A class to perform the actual proxying """ ac = ac.container def __init__(self, service_name: str, oidc_handler: OidcHandler, cfg: config.ServiceConfig): self.service_name = service_name self.cfg = cfg self._oidc_handler = oidc_handler def _proxy(self, url: str, access: Dict) -> str: """ Actually perform the proxying. 1. Setup request 2. Setup authentication 3. Get library method to use 4. Perform outgoing request 5. Answer the request """ # Copy request headers access['headers'].pop('Authorization', None) # Setup authentication (bearer/cert) cert = None if self.cfg.authentication: # bearer? if self.cfg['authentication']['type'] == "Bearer": access['headers']['Authorization'] = "Bearer {}".format( self.cfg['authentication']['token']) if self.cfg['authentication']['type'] == "Certificate": cert = (self.cfg['authentication']['certfile'], self.cfg['authentication']['keyfile']) # Get requests method method_switcher: Dict[str, Callable] = { "GET": requests.get, "PUT": requests.put, "POST": requests.post, "DELETE": requests.delete } method = method_switcher.get(access['method'], None) if not method: raise NotImplementedError # Outgoing request kwargs = {"headers": access['headers'], "data": access['body']} if cert: kwargs['cert'] = cert resp = method(url, **kwargs) # Answer the request for header in resp.headers.items(): if header[0].lower() == 'transfer-encoding': continue logging.debug("Proxy Request Header: %s", header) cherrypy.response.headers[header[0]] = header[1] cherrypy.response.status = resp.status_code return resp def _build_url(self, url: str, kwargs: Any) -> str: url = "{}/{}".format(self.cfg['origin_URL'], url) if kwargs: url = "{}?{}".format(url, urllib.parse.urlencode(kwargs)) return url def _build_proxy_url(self, path: str = '', kwargs: Any = None) -> str: kwargs = {} if kwargs is None else kwargs this_url = "{}{}/{}".format(self._oidc_handler.cfg.proxy['baseuri'], self.cfg['proxy_URL'][1:], path) if kwargs: this_url = "{}?{}".format(this_url, urllib.parse.urlencode(kwargs)) return this_url @staticmethod def _send_403(message: str = '') -> str: cherrypy.response.status = 403 return "<h1>Forbidden</h1><br>%s" % message
[docs] @staticmethod def build_access_dict(query_dict: Optional[Dict] = None) -> Dict: """Creates the access dict for the evaluation context """ query_dict = query_dict if query_dict is not None else {} method = cherrypy.request.method headers = copy.copy(cherrypy.request.headers) headers.pop('host', None) headers.pop('Content-Length', None) headers['connection'] = "close" # Read request body request_body = "" if cherrypy.request.method in cherrypy.request.methods_with_bodies: request_body = cherrypy.request.body.read() return {"method": method, "body": request_body, "headers": headers, "query_dict" : query_dict}
# pylint: disable=W0613 # disable unused arguments
[docs] @cherrypy.expose def index(self, *args: Any, **kwargs: Any) -> str: """ Connects to the origin_URL of the proxied service. Important: If a request parameter "url" is in the REQUEST, it will override the path information. /serviceA/urlinformation?url=test will translate to: <ServiceA>/test """ try: del kwargs['_'] except KeyError: pass _, userinfo = self._oidc_handler.get_userinfo() #called_url = cherrypy.url(qs=cherrypy.request.query_string) called_url_wo_qs = cherrypy.url() path = called_url_wo_qs[len(self._build_proxy_url()):] #LOGGING.debug("Called url was %s ", called_url) target_url = self._build_url(path, kwargs) object_dict = ObjectDict(objsetter=self.cfg['objectsetters'], initialdata={ "path": path, "target_url": target_url, "service": self.service_name, }) access = self.build_access_dict(query_dict=kwargs) context = { "subject": userinfo, "object": object_dict, "environment": EnvironmentDict(), "access": access } LOGGING.debug("Container is %s", self.ac) evaluation_result = self.ac.evaluate_by_entity_id( self.cfg['AC'], context) (effect, missing, obligations) = (evaluation_result.results[self.cfg['AC']], evaluation_result.missing_attr, evaluation_result.obligations) LOGGING.debug("Obligations are: %s", obligations) obligations_dict = ObligationsDict() obligations_result = obligations_dict.run_all(obligations, effect, context, self.cfg.obligations) if effect == ac.Effects.GRANT and all(obligations_result): return self._proxy(target_url, access) if len(missing) > 0: # -> Are we logged in? attr = set(missing) self._oidc_handler.need_claims(list(attr)) warn = ("Failed to get the claims even we requested the " + "right scopes.<br>Missing claims are:<br>") warn += "<br>".join(attr) return self._send_403(warn) return self._send_403("")
[docs]class TLSOnlyDispatcher(Dispatcher): """ Dispatcher for cherrypy to force TLS """ def __init__(self, tls_url: str, next_dispatcher: Dispatcher): super().__init__() self._tls_url = tls_url self._next_dispatcher = next_dispatcher def __call__(self, path_info: str) -> Dispatcher: if cherrypy.request.scheme == 'https': return self._next_dispatcher(path_info) return self._next_dispatcher(self._tls_url + path_info)