202 lines
6.2 KiB
Python
202 lines
6.2 KiB
Python
################################################################################
|
|
# Copyright (C) 2025 GFZ Helmholtz Centre for Geosciences
|
|
#
|
|
# JWT-based authentication
|
|
################################################################################
|
|
|
|
import json
|
|
import jwt
|
|
import time
|
|
from twisted.cred.checkers import ICredentialsChecker
|
|
from twisted.cred.credentials import ICredentials, IAnonymous
|
|
from twisted.cred.error import LoginFailed
|
|
from twisted.cred.portal import IRealm, Portal
|
|
from twisted.web.guard import HTTPAuthSessionWrapper
|
|
from twisted.web.iweb import ICredentialFactory
|
|
from twisted.web.resource import IResource
|
|
from zope.interface import implementer
|
|
from urllib.request import urlopen
|
|
from seiscomp import logging
|
|
|
|
|
|
class IBearerToken(ICredentials):
|
|
"""
|
|
IBearerToken credentials encapsulate a valid bearer token.
|
|
|
|
Parameters:
|
|
token The bearer token str
|
|
payload The decoded and valid token payload
|
|
"""
|
|
|
|
|
|
@implementer(IBearerToken)
|
|
class JSONWebToken(object):
|
|
"""The BearerToken credential object that will be passed to a Checker"""
|
|
|
|
def __init__(self, token, payload):
|
|
self.token = token
|
|
self.payload = payload
|
|
|
|
|
|
@implementer(ICredentialFactory)
|
|
class _CredentialFactory(object):
|
|
|
|
scheme = b"bearer"
|
|
|
|
def __init__(self, issuers, audience, algorithms, update_min, update_max):
|
|
"""
|
|
issuers list. List of valid issuer URLs
|
|
audience list. List of valid audiences
|
|
algorithms list. List of algorithms to check
|
|
"""
|
|
self.__issuers = issuers
|
|
self.__audience = audience
|
|
self.__algorithms = algorithms
|
|
self.__update_min = update_min
|
|
self.__update_max = update_max
|
|
self.__keys = {}
|
|
|
|
for iss in issuers:
|
|
self.__keys[iss] = (0, {})
|
|
|
|
def __update_keys(self, iss):
|
|
isscfg = json.loads(urlopen(iss.rstrip('/') + '/.well-known/openid-configuration').read())
|
|
aux = json.loads(urlopen(isscfg["jwks_uri"]).read())
|
|
keys = {}
|
|
|
|
for k in aux['keys']:
|
|
logging.notice(f"received JWT signing key {k['kid']} from issuer {iss}")
|
|
keys[k['kid']] = jwt.PyJWK.from_dict(k)
|
|
|
|
self.__keys[iss] = (time.time(), keys)
|
|
return keys
|
|
|
|
def __get_signing_key_from_jwt(self, tok):
|
|
rawtoken = jwt.api_jwt.decode_complete(tok, options={"verify_signature": False})
|
|
iss = rawtoken["payload"]["iss"]
|
|
|
|
try:
|
|
t, keys = self.__keys[iss]
|
|
|
|
except KeyError:
|
|
raise Exception("No matching keys")
|
|
|
|
kid = rawtoken["header"]["kid"]
|
|
|
|
if t < time.time() - self.__update_max or \
|
|
(kid not in keys and t < time.time() - self.__update_min):
|
|
keys = self.__update_keys(iss)
|
|
|
|
try:
|
|
return keys[kid]
|
|
|
|
except KeyError:
|
|
raise Exception("No matching keys")
|
|
|
|
@staticmethod
|
|
def getChallenge(request):
|
|
"""
|
|
Generate the challenge for use in the WWW-Authenticate header
|
|
|
|
@param request: The L{twisted.web.http.Request}
|
|
|
|
@return: The C{dict} that can be used to generate a WWW-Authenticate
|
|
header.
|
|
"""
|
|
if hasattr(request, "_jwt_error"):
|
|
return {"error": "invalid_token", "error_description": request._jwt_error}
|
|
|
|
return {}
|
|
|
|
def decode(self, response, request):
|
|
"""Returns JSONWebToken (Credentials)
|
|
|
|
Create a JSONWebToken object from the given authentication_str.
|
|
|
|
response str. The bearer str minus the scheme
|
|
request obj. The request being processed
|
|
"""
|
|
try:
|
|
signing_key = self.__get_signing_key_from_jwt(response)
|
|
payload = jwt.decode(
|
|
response,
|
|
signing_key.key,
|
|
audience=self.__audience,
|
|
algorithms=self.__algorithms,
|
|
)
|
|
|
|
except Exception as e:
|
|
request._jwt_error = str(e)
|
|
raise LoginFailed(request._jwt_error)
|
|
|
|
if not payload:
|
|
request._jwt_error = "No matching keys"
|
|
raise LoginFailed(request._jwt_error)
|
|
|
|
# compatibility with legacy EIDA token
|
|
if "email" in payload and not "mail" in payload:
|
|
payload["mail"] = payload["email"]
|
|
|
|
payload["blacklisted"] = False
|
|
|
|
return JSONWebToken(token=response, payload=payload)
|
|
|
|
|
|
@implementer(ICredentialsChecker)
|
|
class _CredentialsChecker(object):
|
|
|
|
credentialInterfaces = [IBearerToken, IAnonymous]
|
|
|
|
def requestAvatarId(self, credentials):
|
|
"""
|
|
Portal will call this method if a BearerToken credential is provided.
|
|
|
|
Check the given credentials and return a suitable user identifier if
|
|
they are valid.
|
|
|
|
credentials JSONWebToken. A IBearerToken object containing a decoded token
|
|
"""
|
|
if IBearerToken.providedBy(credentials):
|
|
# Avatar ID is the full token payload
|
|
return credentials.payload
|
|
|
|
if IAnonymous.providedBy(credentials):
|
|
# Anonymous user
|
|
return None
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
@implementer(IRealm)
|
|
class _Realm(object):
|
|
# ---------------------------------------------------------------------------
|
|
def __init__(self, service, args, kwargs):
|
|
self.__service = service
|
|
self.__args = args
|
|
self.__kwargs = kwargs
|
|
|
|
# ---------------------------------------------------------------------------
|
|
def requestAvatar(self, avatarId, mind, *interfaces):
|
|
if IResource in interfaces:
|
|
return (
|
|
IResource,
|
|
self.__service(
|
|
*self.__args,
|
|
**self.__kwargs,
|
|
user=avatarId,
|
|
),
|
|
lambda: None,
|
|
)
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
class JWT(object):
|
|
def __init__(self, issuers, audience, algorithms, update_min, update_max):
|
|
self.__cf = _CredentialFactory(issuers, audience, algorithms, update_min, update_max)
|
|
|
|
def getAuthSessionWrapper(self, service, *args, **kwargs):
|
|
realm = _Realm(service, args, kwargs)
|
|
portal = Portal(realm, [_CredentialsChecker()])
|
|
return HTTPAuthSessionWrapper(portal, [self.__cf])
|