arpoc package

Submodules

arpoc.base module

The two base classes OidcHandler and ServiceProxy Also the TLSOnlyDispatcher to force https

class arpoc.base.OidcHandler(cfg: arpoc.config.OIDCProxyConfig)[source]

Bases: object

A class to handle the connection to OpenID Connect Providers

auth(**kwargs: Any) → Optional[str][source]

Start an authentication request. Redirects to OIDC Provider if given

static check_scopes(request: List, response: List) → Optional[str][source]

Checks the request and response scopes and alert if the response scopes are not enough

create_client_from_secrets(name: str, provider: arpoc.config.ProviderConfig) → None[source]

Try to create an openid connect client from the secrets that are saved in the secrets file

do_userinfo_request_with_state(state: str) → Dict[source]

Perform the userinfo request with given state

get_access_token_from_code(state: str, code: str) → oic.oic.Token[source]

Takes the OIDC Authorization Code, Performs the Access Token Request

Returns: The Access Token Request Response

static get_access_token_from_headers() → Union[None, str][source]

Returns the Access Token from the authorization header. Strips the bearer part

get_secrets() → Dict[str, dict][source]

Returns the secrets (client_id, client_secret) of the OIDC Relying Partys

get_userinfo() → Tuple[Optional[str], Dict][source]

Gets the userinfo from the OIDC Provider. This works in two steps:

  1. Check if the user supplied an Access Token

  2. Otherwise, check the session management if the user is logged in

get_userinfo_access_token(access_token: str) → Tuple[int, Dict][source]

Get the user info if the user supplied an access token

static get_validity_from_token(token: oic.oic.Token) → Tuple[int, int][source]

Find the validity of the id_token, access_token and refresh_token

need_claims(claims: List[str]) → None[source]

Maps claims to scopes and checks if the scopes were already requested. Else start auth procedure to get requested scopes

redirect(**kwargs: Any) → str[source]

Handler for the redirect method (entrypoint after forwarding to OIDC Provider

refresh_access_token(hash_access_token: str) → Tuple[str, Dict][source]

Refreshes the access token. This can only be done, if we are Client (normal web interface).

register_first_time(name: str, provider: arpoc.config.ProviderConfig) → None[source]

Registers a client or reads the configuration from the registration endpoint

If registration_url is present in the configuration file, then it will try to read the configuration using the registration_token.

If configuration_url is present in the configuration file, it will try to set the configuration using the registration endpoint dynamically received with the well-known location url (configuration_url)

set_secrets(secrets)[source]

Set the secrets dict

class arpoc.base.ServiceProxy(service_name: str, oidc_handler: arpoc.base.OidcHandler, cfg: arpoc.config.ServiceConfig)[source]

Bases: object

A class to perform the actual proxying

ac = <arpoc.ac.AC_Container object>
static build_access_dict(query_dict: Optional[Dict] = None) → Dict[source]

Creates the access dict for the evaluation context

index(*args: Any, **kwargs: Any) → str[source]

Connects to the origin_URL of the proxied service. Important: If a request parameter “url” is in the REQUEST, it will override the path information. /serviceA/urlinformation?url=test will translate to: <ServiceA>/test

class arpoc.base.TLSOnlyDispatcher(tls_url: str, next_dispatcher: cherrypy._cpdispatch.Dispatcher)[source]

Bases: cherrypy._cpdispatch.Dispatcher

Dispatcher for cherrypy to force TLS

arpoc.cache module

Cache Module for OIDCPROXY

class arpoc.cache.Cache[source]

Bases: collections.UserDict

The cache. On Every action (get, put) a clean up procedure is triggered

expire() → None[source]

Deletes invalid entries from the cache

get(key: str, default: Any = None) → Any[source]

Returns the element in the cache referenced by key. If no element exists with the key, <default> is returned

put(key: str, data: Any, valid: int) → None[source]

Inserts the element <data> with <key> into the cache. Expiration time <valid>.

class arpoc.cache.CacheItem(timestamp: int, item: Any)[source]

Bases: object

CacheItem – A item in the Cache timestamp: An Unix Timestamp in seconds since beginning of the epoch. item: The Item

item: Any
timestamp: int

arpoc.config module

Configuration Module of ARPOC

After importing this file you have access to the configuration with the config.cfg variable.

class arpoc.config.ACConfig(json_dir: List[str] = <factory>)[source]

Bases: object

Configuration for the access control

Attributes:
  • json_dir: The directory where the AC Entities are stored. The files

    must end with “.json”

json_dir: List[str]
class arpoc.config.Misc(pid_file: str = '/var/run/arpoc.pid', daemonize: bool = True, log_level: str = 'INFO', access_log: str = '/var/log/arpoc/access.log', error_log: str = '/var/log/arpoc/error.log', plugin_dirs: List[str] = <factory>)[source]

Bases: object

Misc Config Class

Attributes:
  • access_log: The location to store the access log (HTTP requests)

  • error_log: The location to store the error_log

  • daemonize: If arpoc should start daemonized.

  • log_level: ARPOC’s log level. (DEBUG/INFO/ERROR/WARN). Affects also underlying libraries

  • pid_file: Where ARPOC should store the process id file. Only used when daemonized.

  • plugin_dirs: Where ARPOC should load plugins

No mandatory arguments.

access_log: str = '/var/log/arpoc/access.log'
daemonize: bool = True
error_log: str = '/var/log/arpoc/error.log'
log_level: str = 'INFO'
pid_file: str = '/var/run/arpoc.pid'
plugin_dirs: List[str]
class arpoc.config.OIDCProxyConfig(config_file: Optional[str] = None, std_config: Optional[str] = '/etc/arpoc/config.yml')[source]

Bases: object

Config Container Which for all specific configuration

add_provider(name: str, prov_cfg: arpoc.config.ProviderConfig) → None[source]

Adds the provider with key <name> to the configuration

check_config() → None[source]

Make consistency checks for the arpoc config

check_config_proxy_url() → None[source]

Checks for duplicates in the proxy_url

check_redirect_uri() → None[source]

Checks if every redirect uri in the provider config is also in the proxy list

merge_config(new_cfg: Dict) → None[source]

Merges the current configuration with a new configuration dict

print_config() → None[source]

Print the current config

static print_sample_config() → None[source]

Prints a sample config

read_file(filepath: str) → None[source]

Read the YAML file <filepath> and add the contents to the current configuration

class arpoc.config.ProviderConfig(baseuri: dataclasses.InitVar, human_readable_name: str, configuration_url: str = '', configuration_token: str = '', registration_token: str = '', registration_url: str = '', method: str = 'auto', special_claim2scope: dataclasses.InitVar = None, redirect_paths: List[str] = <factory>, do_token_introspection: bool = True)[source]

Bases: object

Configuration for a single Open ID Connect Provider

Attributes:
  • human_readable_name: A name which arpoc uses when communicating

    with the user / operator

  • configuration_url: The base url of the OIDC provider. Without the

    .well-known/ part

  • configuration_token: The token ARPOC can use to register itself with

    the OIDC provider

  • registration_token: The token issued from the OIDC provider for a

    specific client to obtain its configuration

  • registration_url: The url where arpoc can obtain its configuration

    after registration.

  • method: Either ‘auto’, ‘GET’, or ‘POST’. The HTTP method ARPOC will

    use if the OIDC / OAuth standard gives a choice.

  • special_claim2scope: A mapping from claim to scopes that will deliver

    the claims.

Mandatory arguments:
  • configuration_url

And either:
  • configuration_token

Or:
  • registration_token

  • registration_url

baseuri: dataclasses.InitVar

arpoc’s base uri

check_method()[source]
claim2scope: dict
configuration_token: str = ''
configuration_url: str = ''
do_token_introspection: bool = True
human_readable_name: str
method: str = 'auto'
redirect_paths: List[str]
registration_token: str = ''
registration_url: str = ''
special_claim2scope: dataclasses.InitVar = None
class arpoc.config.ProxyConfig(keyfile: str, certfile: str, domainname: str, contacts: List[str], address: str = '0.0.0.0', tls_port: int = 443, plain_port: int = 80, https_only: bool = True, username: str = 'www-data', groupname: str = 'www-data', secrets: str = '/var/lib/arpoc/secrets.yml', tls_redirect: str = '/TLSRedirect', auth: str = '/auth', redirect: List[str] = <factory>)[source]

Bases: object

Configuration for the Proxy Setup

Attributes:
  • keyfile: The path to the private key file of the TLS keypair

  • certfile: The path to the certificate chain file (full chain)

  • domainname: The domain name where ARPOC will be available

  • contacts: A list of mail contact adresses responsible for the ARPOC

    instance

Mandatory: keyfile, certfile, domainname, contacts

address: str = '0.0.0.0'
auth: str = '/auth'
certfile: str
contacts: List[str]
domainname: str
groupname: str = 'www-data'
https_only: bool = True
keyfile: str
plain_port: int = 80
redirect: List[str]
secrets: str = '/var/lib/arpoc/secrets.yml'
tls_port: int = 443
tls_redirect: str = '/TLSRedirect'
username: str = 'www-data'
class arpoc.config.ServiceConfig(origin_URL: str, proxy_URL: str, AC: str, objectsetters: dict = <factory>, obligations: dict = <factory>, authentication: dict = <factory>)[source]

Bases: object

Configuration for a single proxied Service

Attributes:
  • origin_URL: The URL that will be proxied, or the special page string; see Special Pages

  • proxy_URL: The path under which origin_URL will be available.

  • AC: The policy set which is evaluated to decide the access request

  • objectsetters: Configuration for the objectsetters

  • obligations: Configuration for obligations

  • authentication Authentication information which will be used to request origin_URL

Mandatory Arguments:
  • origin_URL

  • proxy_URL

  • AC

AC: str
authentication: dict
objectsetters: dict
obligations: dict
origin_URL: str
proxy_URL: str
arpoc.config.default_json_dir() → List[source]

Default json path for access control entities

arpoc.config.default_redirect() → List[source]

Default Redirect Path

arpoc.exceptions module

exception arpoc.exceptions.ACEntityMissing[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.AttributeMissing[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.BadRuleSyntax[source]

Bases: Exception

exception arpoc.exceptions.BadSemantics[source]

Bases: Exception

exception arpoc.exceptions.ConfigError[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.DuplicateKeyError[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.EnvironmentAttributeMissing[source]

Bases: arpoc.exceptions.AttributeMissing

exception arpoc.exceptions.OIDCProxyException[source]

Bases: Exception

exception arpoc.exceptions.ObjectAttributeMissing[source]

Bases: arpoc.exceptions.AttributeMissing

exception arpoc.exceptions.SubjectAttributeMissing(message: str, attr: str)[source]

Bases: arpoc.exceptions.AttributeMissing

arpoc.special_pages module

class arpoc.special_pages.Userinfo(service_name: str, oidc_handler: arpoc.base.OidcHandler, cfg: arpoc.config.ServiceConfig)[source]

Bases: arpoc.base.ServiceProxy

Class of the special page Userinfo.

Displays the userinfo of a logged-in user. If the user is not logged-in, an empty dictionary is displayed. You can force the user to be logged in by setting the right access control policies.

arpoc.utils module

ARPOC utils

Time functions that are used in ARPOC. Can be swapped for unittests

arpoc.utils.now()[source]

Return the seconds since begin of the EPOCH

arpoc.utils.now_object()[source]

Return a datetime object with current timestamp

Module contents

Main module of the ARPOC

class arpoc.App[source]

Bases: object

Class for application handling. Reads configuration files, setups the oidc client classes and the dispatcher for the services

cancel_scheduler()[source]

Cancels every event in the scheduler queue

create_secrets_dir() → None[source]

Create the secrets dir and sets permission and ownership

get_routes_dispatcher() → cherrypy._cpdispatch.RoutesDispatcher[source]

Setups the Cherry Py dispatcher This connects makes the proxied services accessible

static read_secrets(filepath: str) → Dict[source]

Reads the secrets file from the filepath

retry(function: Callable, exceptions: Tuple, *args: Any, retries: int = 5, retry_delay: int = 30) → None[source]

Retries function <retries> times, as long as <exceptions> are thrown

run() → None[source]

Starts the application

save_secrets() → None[source]

Saves the oidc rp secrets into the secrets file

setup_loggers() → None[source]

Read the loggers configuration and configure the loggers

setup_oidc_provider() → None[source]

Setup the connection to all oidc providers in the config

tls_redirect(*args: Any, **kwargs: Any) → None[source]

Rewrites the url so that we use https. May alter the hostname (localhost -> domainname)

arpoc.get_argparse_instance()[source]