arpoc package


arpoc.base module

The two base classes OidcHandler and ServiceProxy Also the TLSOnlyDispatcher to force https

class arpoc.base.OidcHandler(cfg: arpoc.config.OIDCProxyConfig)[source]

Bases: object

A class to handle the connection to OpenID Connect Providers

auth(**kwargs: Any) → Optional[str][source]

Start an authentication request. Redirects to OIDC Provider if given

static check_scopes(request: List, response: List) → Optional[str][source]

Checks the request and response scopes and alert if the response scopes are not enough

create_client_from_secrets(name: str, provider: arpoc.config.ProviderConfig) → None[source]

Try to create an openid connect client from the secrets that are saved in the secrets file

do_userinfo_request_with_state(state: str) → Dict[source]

Perform the userinfo request with given state

get_access_token_from_code(state: str, code: str) → oic.oic.Token[source]

Takes the OIDC Authorization Code, Performs the Access Token Request

Returns: The Access Token Request Response

static get_access_token_from_headers() → Union[None, str][source]

Returns the Access Token from the authorization header. Strips the bearer part

get_secrets() → Dict[str, dict][source]

Returns the secrets (client_id, client_secret) of the OIDC Relying Partys

get_userinfo() → Tuple[Optional[str], Dict][source]

Gets the userinfo from the OIDC Provider. This works in two steps:

  1. Check if the user supplied an Access Token

  2. Otherwise, check the session management if the user is logged in

get_userinfo_access_token(access_token: str) → Tuple[int, Dict][source]

Get the user info if the user supplied an access token

static get_validity_from_token(token: oic.oic.Token) → Tuple[int, int][source]

Find the validity of the id_token, access_token and refresh_token

need_claims(claims: List[str]) → None[source]

Maps claims to scopes and checks if the scopes were already requested. Else start auth procedure to get requested scopes

redirect(**kwargs: Any) → str[source]

Handler for the redirect method (entrypoint after forwarding to OIDC Provider

refresh_access_token(hash_access_token: str) → Tuple[str, Dict][source]

Refreshes the access token. This can only be done, if we are Client (normal web interface).

register_first_time(name: str, provider: arpoc.config.ProviderConfig) → None[source]

Registers a client or reads the configuration from the registration endpoint

If registration_url is present in the configuration file, then it will try to read the configuration using the registration_token.

If configuration_url is present in the configuration file, it will try to set the configuration using the registration endpoint dynamically received with the well-known location url (configuration_url)


Set the secrets dict

class arpoc.base.ServiceProxy(service_name: str, oidc_handler: arpoc.base.OidcHandler, cfg: arpoc.config.ServiceConfig)[source]

Bases: object

A class to perform the actual proxying

ac = < object>
static build_access_dict(query_dict: Optional[Dict] = None) → Dict[source]

Creates the access dict for the evaluation context

index(*args: Any, **kwargs: Any) → str[source]

Connects to the origin_URL of the proxied service. Important: If a request parameter “url” is in the REQUEST, it will override the path information. /serviceA/urlinformation?url=test will translate to: <ServiceA>/test

class arpoc.base.TLSOnlyDispatcher(tls_url: str, next_dispatcher: cherrypy._cpdispatch.Dispatcher)[source]

Bases: cherrypy._cpdispatch.Dispatcher

Dispatcher for cherrypy to force TLS

arpoc.cache module

Cache Module for OIDCPROXY

class arpoc.cache.Cache[source]

Bases: collections.UserDict

The cache. On Every action (get, put) a clean up procedure is triggered

expire() → None[source]

Deletes invalid entries from the cache

get(key: str, default: Any = None) → Any[source]

Returns the element in the cache referenced by key. If no element exists with the key, <default> is returned

put(key: str, data: Any, valid: int) → None[source]

Inserts the element <data> with <key> into the cache. Expiration time <valid>.

class arpoc.cache.CacheItem(timestamp: int, item: Any)[source]

Bases: object

CacheItem – A item in the Cache timestamp: An Unix Timestamp in seconds since beginning of the epoch. item: The Item

item: Any
timestamp: int

arpoc.config module

Configuration Module of ARPOC

After importing this file you have access to the configuration with the config.cfg variable.

class arpoc.config.ACConfig(json_dir: List[str] = <factory>)[source]

Bases: object

Configuration for the access control

  • json_dir: The directory where the AC Entities are stored. The files

    must end with “.json”

json_dir: List[str]
class arpoc.config.Misc(pid_file: str = '/var/run/', daemonize: bool = True, log_level: str = 'INFO', access_log: str = '/var/log/arpoc/access.log', error_log: str = '/var/log/arpoc/error.log')[source]

Bases: object

Misc Config Class

  • access_log: The location to store the access log (HTTP requests)

  • error_log: The location to store the error_log

  • daemonize: If arpoc should start daemonized.

  • log_level: ARPOC’s log level. (DEBUG/INFO/ERROR/WARN). Affects also underlying libraries

  • pid_file: Where ARPOC should store the process id file. Only used when daemonized.

No mandatory arguments.

access_log: str = '/var/log/arpoc/access.log'
daemonize: bool = True
error_log: str = '/var/log/arpoc/error.log'
log_level: str = 'INFO'
pid_file: str = '/var/run/'
class arpoc.config.OIDCProxyConfig(config_file: Optional[str] = None, std_config: Optional[str] = '/etc/arpoc/config.yml')[source]

Bases: object

Config Container Which for all specific configuration

add_provider(name: str, prov_cfg: arpoc.config.ProviderConfig) → None[source]

Adds the provider with key <name> to the configuration

check_config() → None[source]

Make consistency checks for the arpoc config

check_config_proxy_url() → None[source]

Checks for duplicates in the proxy_url

check_redirect_uri() → None[source]

Checks if every redirect uri in the provider config is also in the proxy list

merge_config(new_cfg: Dict) → None[source]

Merges the current configuration with a new configuration dict

print_config() → None[source]

Print the current config

static print_sample_config() → None[source]

Prints a sample config

read_file(filepath: str) → None[source]

Read the YAML file <filepath> and add the contents to the current configuration

class arpoc.config.ProviderConfig(baseuri: dataclasses.InitVar, human_readable_name: str, configuration_url: str = '', configuration_token: str = '', registration_token: str = '', registration_url: str = '', method: str = 'auto', special_claim2scope: dataclasses.InitVar = None, redirect_paths: List[str] = <factory>, do_token_introspection: bool = True)[source]

Bases: object

Configuration for a single Open ID Connect Provider

  • human_readable_name: A name which arpoc uses when communicating

    with the user / operator

  • configuration_url: The base url of the OIDC provider. Without the

    .well-known/ part

  • configuration_token: The token ARPOC can use to register itself with

    the OIDC provider

  • registration_token: The token issued from the OIDC provider for a

    specific client to obtain its configuration

  • registration_url: The url where arpoc can obtain its configuration

    after registration.

  • method: Either ‘auto’, ‘GET’, or ‘POST’. The HTTP method ARPOC will

    use if the OIDC / OAuth standard gives a choice.

  • special_claim2scope: A mapping from claim to scopes that will deliver

    the claims.

Mandatory arguments:
  • configuration_url

And either:
  • configuration_token

  • registration_token

  • registration_url

baseuri: dataclasses.InitVar

arpoc’s base uri

claim2scope: dict
configuration_token: str = ''
configuration_url: str = ''
do_token_introspection: bool = True
human_readable_name: str
method: str = 'auto'
redirect_paths: List[str]
registration_token: str = ''
registration_url: str = ''
special_claim2scope: dataclasses.InitVar = None
class arpoc.config.ProxyConfig(keyfile: str, certfile: str, domainname: str, contacts: List[str], address: str = '', tls_port: int = 443, plain_port: int = 80, https_only: bool = True, username: str = 'www-data', groupname: str = 'www-data', secrets: str = '/var/lib/arpoc/secrets.yml', tls_redirect: str = '/TLSRedirect', auth: str = '/auth', redirect: List[str] = <factory>, plugin_dirs: List[str] = <factory>)[source]

Bases: object

Configuration for the Proxy Setup

  • keyfile: The path to the private key file of the TLS keypair

  • certfile: The path to the certificate chain file (full chain)

  • domainname: The domain name where ARPOC will be available

  • contacts: A list of mail contact adresses responsible for the ARPOC


Mandatory: keyfile, certfile, domainname, contacts

address: str = ''
auth: str = '/auth'
certfile: str
contacts: List[str]
domainname: str
groupname: str = 'www-data'
https_only: bool = True
keyfile: str
plain_port: int = 80
plugin_dirs: List[str]
redirect: List[str]
secrets: str = '/var/lib/arpoc/secrets.yml'
tls_port: int = 443
tls_redirect: str = '/TLSRedirect'
username: str = 'www-data'
class arpoc.config.ServiceConfig(origin_URL: str, proxy_URL: str, AC: str, objectsetters: dict = <factory>, obligations: dict = <factory>, authentication: dict = <factory>)[source]

Bases: object

Configuration for a single proxied Service

  • origin_URL: The URL that will be proxied, or the special page string; see Special Pages

  • proxy_URL: The path under which origin_URL will be available.

  • AC: The policy set which is evaluated to decide the access request

  • objectsetters: Configuration for the objectsetters

  • obligations: Configuration for obligations

  • authentication Authentication information which will be used to request origin_URL

Mandatory Arguments:
  • origin_URL

  • proxy_URL

  • AC

AC: str
authentication: dict
objectsetters: dict
obligations: dict
origin_URL: str
proxy_URL: str
arpoc.config.default_json_dir() → List[source]

Default json path for access control entities

arpoc.config.default_redirect() → List[source]

Default Redirect Path

arpoc.exceptions module

exception arpoc.exceptions.ACEntityMissing[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.AttributeMissing[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.BadRuleSyntax[source]

Bases: Exception

exception arpoc.exceptions.BadSemantics[source]

Bases: Exception

exception arpoc.exceptions.ConfigError[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.DuplicateKeyError[source]

Bases: arpoc.exceptions.OIDCProxyException

exception arpoc.exceptions.EnvironmentAttributeMissing[source]

Bases: arpoc.exceptions.AttributeMissing

exception arpoc.exceptions.OIDCProxyException[source]

Bases: Exception

exception arpoc.exceptions.ObjectAttributeMissing[source]

Bases: arpoc.exceptions.AttributeMissing

exception arpoc.exceptions.SubjectAttributeMissing(message: str, attr: str)[source]

Bases: arpoc.exceptions.AttributeMissing

arpoc.special_pages module

class arpoc.special_pages.Userinfo(service_name: str, oidc_handler: arpoc.base.OidcHandler, cfg: arpoc.config.ServiceConfig)[source]

Bases: arpoc.base.ServiceProxy

arpoc.utils module

ARPOC utils

Time functions that are used in ARPOC. Can be swapped for unittests[source]

Return the seconds since begin of the EPOCH


Return a datetime object with current timestamp

Module contents

Main module of the ARPOC

class arpoc.App[source]

Bases: object

Class for application handling. Reads configuration files, setups the oidc client classes and the dispatcher for the services


Cancels every event in the scheduler queue

create_secrets_dir() → None[source]

Create the secrets dir and sets permission and ownership

get_routes_dispatcher() → cherrypy._cpdispatch.RoutesDispatcher[source]

Setups the Cherry Py dispatcher This connects makes the proxied services accessible

static read_secrets(filepath: str) → Dict[source]

Reads the secrets file from the filepath

retry(function: Callable, exceptions: Tuple, *args: Any, retries: int = 5, retry_delay: int = 30) → None[source]

Retries function <retries> times, as long as <exceptions> are thrown

run() → None[source]

Starts the application

save_secrets() → None[source]

Saves the oidc rp secrets into the secrets file

setup_loggers() → None[source]

Read the loggers configuration and configure the loggers

setup_oidc_provider() → None[source]

Setup the connection to all oidc providers in the config

tls_redirect(*args: Any, **kwargs: Any) → None[source]

Rewrites the url so that we use https. May alter the hostname (localhost -> domainname)