RETROSPECTIVE

February 18th, 2022

Building an API for Authentication with AWS Lambda and API Gateway

AWS Lambda

API Gateway

AWS

Terraform

Python

s

This is part of a series of articles on SaintsXCTF Version 2.0. The first article in the series provides an overview of the application. You DO NOT need to read prior articles in the series to fully understand this article.

My SaintsXCTF application has multiple APIs. One of these APIs, auth.saintsxctf.com, is hosted on AWS API Gateway and AWS Lambda. Its main purpose is to provide an authentication mechanism for the application. In this article, I discuss endpoints within this API and how they provide application authentication using JWTs.

The authentication API consists of two REST API endpoints backed by AWS Lambda functions and two standalone AWS Lambda functions.

AWS infrastructure for the API is created using Terraform. This Terraform configuration exists in my saints-xctf-infrastructure repository. Infrastructure is created and destroyed using Jenkins jobs. In this article I focus on the functionality of the authentication API instead of the infrastructure, so I will not discuss infrastructure or CI/CD processes in any more depth.

The idea behind this API is to have authentication logic separated from main application logic. You can think of this API as a microservice for user authentication. The authentication mechanism for the application is JWT tokens. Users pass their username and password in exchange for a JWT token, which is used in subsequent API requests. Behind the scenes, the application API checks whether a JWT token passed in an HTTP request is valid. If the token is valid, the API request is processed and a response is returned. If the token is invalid, an error message is returned. JWTs expire after a certain amount of time, and the RSA key used to generate JWTs is rotated weekly for additional security.

The authentication API exposes two endpoints to users: /token and /authenticate. Both endpoints are backed by AWS Lambda functions. The API has two additional AWS Lambda functions designed for internal usage: SaintsXCTFAuthorizer and SaintsXCTFRotate. Let's take a look at all these AWS Lambda functions in more detail.

The /token API endpoint is used to exchange user credentials for a JWT. /token accepts a JSON request body containing a username and a password. The following CURL request shows how to properly use the endpoint.

RequestBody='{"clientId":"my_username","clientSecret":"my_password"}' curl -X POST \ -d "${RequestBody}" \ -H "Content-Type: application/json" \ https://auth.saintsxctf.com/token

In the request body, my_username and my_password are replaced with a user's username and password, respectively. The response body will look something like the following, with j.w.t replaced with a valid JWT.

{ "result": "j.w.t" }

As previously mentioned, each endpoint is backed by an AWS Lambda function. The function for the /token endpoint is shown below, and exists in a function.py file within my saints-xctf-auth repository.

import os import boto3 import json import re from datetime import datetime from typing import Any import jwt import bcrypt from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from boto3_type_annotations.secretsmanager import Client as SecretsManagerClient from boto3_type_annotations.rds import Client as RDSClient from User import User def lambda_handler(event, context): client_id = event['clientId'] client_secret = event['clientSecret'] env = os.environ['ENV'] print(f"Client: {client_id}, Environment: {env}") secretsmanager: SecretsManagerClient = boto3.client('secretsmanager', region_name='us-east-1') rds: RDSClient = boto3.client('rds', region_name='us-east-1') private_key = get_jwt_private_key(secretsmanager, env) db_secret = get_rds_credentials(secretsmanager, env) session = create_database_session(rds, db_secret, env) print("Database Session Initialized and Secrets Retrieved") email_pattern = re.compile('^(([a-zA-Z0-9_.-])+@([a-zA-Z0-9_.-])+\\.([a-zA-Z])+([a-zA-Z])+)?$') if email_pattern.match(client_id): print("Retrieving User by Email") user = session.query(User).filter_by(email=client_id).first() else: print("Retrieving User by Username") user = session.query(User).filter_by(username=client_id).first() if user is None: print(f"No user exists with username/email: {client_id}") return None else: print("A user exists with given client ID.") if bcrypt.checkpw(client_secret.encode('utf-8'), user.password.encode('utf-8')): print("User credentials valid.") iat = int(datetime.utcnow().timestamp()) exp = iat + 2419200 return jwt.encode( payload={ 'iat': iat, 'exp': exp, 'iss': 'auth.saintsxctf.com', 'sub': user.username, 'email': user.email, 'name': f'{user.first} {user.last}' }, key=private_key, algorithm='RS256' ) else: print(f"Invalid password for user with username/email: {client_id}") return None def get_jwt_private_key(secretsmanager: SecretsManagerClient, env: str) -> str: """ Get the RSA encrypted private key used to create JWT tokens. :param secretsmanager: boto3 client for working with SecretsManager. :param env: Environment of the SaintsXCTF authentication private key. :return: A string representing the RSA encrypted private key. """ print("Getting JWT Private Key") secret = secretsmanager.get_secret_value(SecretId=f"saints-xctf-auth-{env}") secret_string = secret.get('SecretString') secret_dict: dict = json.loads(secret_string) return secret_dict["PrivateKey"] def get_rds_credentials(secretsmanager: SecretsManagerClient, env: str) -> dict: """ Get the RDS username and password. :param secretsmanager: boto3 client for working with SecretsManager. :param env: Environment of the SaintsXCTF database. :return: A dictionary containing username and password keys. """ print("Getting RDS Credentials") response = secretsmanager.get_secret_value(SecretId=f'saints-xctf-rds-{env}-secret') secret_string = response.get("SecretString") return json.loads(secret_string) def create_database_session(rds: RDSClient, db_secret: dict, env: str) -> Any: """ Create a database session with RDS in a given environment. :param rds: boto3 client for working with RDS. :param db_secret: Dictionary containing the username and password for the SaintsXCTF RDS/MySQL database. :param env: Environment of the SaintsXCTF database. :return: A session with the database. """ print("Creating Database Session") rds_instances = rds.describe_db_instances(DBInstanceIdentifier=f'saints-xctf-mysql-database-{env}') instance = rds_instances.get('DBInstances')[0] hostname = instance.get('Endpoint').get('Address') username = db_secret.get("username") password = db_secret.get("password") database = 'saintsxctf' db_url = f'mysql+pymysql://{username}:{password}@{hostname}/{database}' engine = create_engine(db_url) Session = sessionmaker() Session.configure(bind=engine) return Session()

lambda_handler() is the entrypoint to the AWS Lambda function. Within lambda_handler(), the first two lines extract the JSON request body fields into client_id and client_secret variables. The function works in both development and production environments, as determined by an ENV environment variable. The environment is retrieved with the line env = os.environ['ENV']. The environment is important because different databases and RSA keys are used in different environments.

Next comes a series of helper function invocations. First, get_jwt_private_key(secretsmanager, env) retrieves an RSA private key from AWS Secrets Manager. This private key is used to create JWTs. Second, get_rds_credentials(secretsmanager, env) retrieves MySQL database credentials. The database is queried to determine if the username and password sent to the API are valid. The third and final helper function, create_database_session(rds, db_secret, env), uses the database credentials to start a session with the application's MySQL database.

Optionally, the /token endpoint takes an email instead of a username. This option is expressed by an if statement and a email_pattern.match(client_id) condition. If an email is used, the user in the database is retrieved with session.query(User).filter_by(email=client_id).first(). Otherwise, the user is retrieved with session.query(User).filter_by(username=client_id).first(). In both cases, a SQLAlchemy model class User is utilized to query the database. I wrote about SQLAlchemy and model classes in an article on my SaintsXCTF API. The User model class exists in a User.py file and is shown below.

from sqlalchemy import Column, VARCHAR from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' username = Column(VARCHAR(20), primary_key=True) first = Column(VARCHAR(30), nullable=False, index=True) last = Column(VARCHAR(30), nullable=False, index=True) password = Column(VARCHAR(255), nullable=False) email = Column(VARCHAR(50), index=True)

If the database query for a user with a certain username or email returns no records, the AWS Lambda function returns None. Otherwise, it checks if the user's password matches the one in the API request body (client_secret). Passwords are encrypted in my database using Bcrypt, so a bcrypt.checkpw() function is used to check for password equality.

If the password submitted in the API request is invalid, None is returned. Otherwise, a JWT is created and returned using the jwt.encode() function.

The /authenticate API endpoint is used to validate a JWT and accepts a JSON request body containing a JWT. The following CURL request shows how to properly use the endpoint.

RequestBody='{"jwt":"my_jwt"}' curl -X POST \ -d "${RequestBody}" \ -H "Content-Type: application/json" \ https://auth.saintsxctf.com/authenticate

In the request body, my_jwt is replaced with the user's JWT. The response body looks like the following JSON. In this example, true is returned due to a valid JWT.

{ "result": true }

The function for the /authenticate endpoint is shown below, and exists in a function.py file within my saints-xctf-auth repository.

import os import traceback import boto3 import json import jwt from boto3_type_annotations.secretsmanager import Client def lambda_handler(event, context): token = event['token'] env = os.environ['ENV'] print(f"Token: {token}, Environment: {env}") secretsmanager: Client = boto3.client('secretsmanager') secret = secretsmanager.get_secret_value(SecretId=f"saints-xctf-auth-{env}") secret_string = secret['SecretString'] secret_dict: dict = json.loads(secret_string) try: jwt.decode( jwt=token, key=secret_dict["PublicKey"], verify=True, algorithms='RS256', options={'require': ['exp', 'iat', 'iss']} ) return True except jwt.ExpiredSignatureError: print("This token has expired.") return False except Exception as e: print("The token is invalid.") traceback.print_exception(type(e), e, e.__traceback__) return False

The AWS Lambda function for /authenticate is a bit simpler than the one for /token. Again, lambda_handler() is the entrypoint to the AWS Lambda function. The first line in lambda_handler() gets the JWT token and places it in a token variable. Again, AWS Secrets Manager is used to get the RSA keys used to validate the JWT. The RSA keys are stored in a secret_dict variable.

The remainder of the code attempts to decode the JWT token using a jwt.decode() function. If the JWT token is deemed valid, the function returns True. Otherwise, there are two possible scenarios, both of which cause the function to return False. The first scenario is when the token expired, resulting in a jwt.ExpiredSignatureError exception. The second is when the token is invalid, which can result in any number of exceptions.

The "Rotate" AWS Lambda function is used internally by the authentication API. It's purpose is to rotate the RSA public and private keys, which are used to create and decode JWTs. In my application, key rotation occurs every seven days. RSA key rotation is done to defend the application from malicious users attempting to gain access to the keys. If malicious users gain access to the keys, they can create their own JWTs. These JWTs would be valid to my application, even though my application did not create them. By rotating the RSA keys, the damage is limited because the malicious user will only have access to valid RSA keys until the next rotation. After a key rotation occurs, existing RSA keys are no longer able to create or validate JWT tokens. The malicious user will no longer be able to cause harm to the application.

The "Rotate" AWS Lambda function is written in Python and exists in a function.py file within my saints-xctf-auth repository. It is based off a sample AWS Lambda function for rotating secrets1. More information on rotating secrets using AWS Secrets Manager and AWS Lambda can be found in the AWS documentation2.

The "Authorizer" AWS Lambda function is used as an API Gateway authorizer3. API Gateway authorizers enable custom authentication for API Gateway endpoints, such as the ones in my fn.saintsxctf.com API.

This authorizer checks if a JWT token passed in an API request is valid for my application. If the token is valid, requests are forwarded to API endpoints in AWS API Gateway. If the token is invalid, requests are denied. The function is very similar to the one behind my /authenticate endpoint, except this time it is for internal use by API Gateway, not publicly accessible over HTTP.

The "Authorizer" function is shown below, and exists in a function.py file within my saints-xctf-auth repository.

import os import json import traceback import jwt from boto3_type_annotations.secretsmanager import Client def lambda_handler(event, context): token = event['authorizationToken'] method_arn = event['methodArn'] env = os.environ['ENV'] print(f"Token: {token}, Environment: {env}") secretsmanager: Client = boto3.client('secretsmanager') secret = secretsmanager.get_secret_value(SecretId=f"saints-xctf-auth-{env}") secret_string = secret['SecretString'] secret_dict: dict = json.loads(secret_string) try: jwt.decode( jwt=token, key=secret_dict["PublicKey"], verify=True, algorithms='RS256', options={'require': ['exp', 'iat', 'iss']} ) return allow_policy(method_arn) except jwt.ExpiredSignatureError: # The date of the 'exp' claim is in the past, meaning the token is expired print("This token has expired.") return deny_policy() except Exception as e: print("Unknown error occurred.") traceback.print_exception(type(e), e, e.__traceback__) return deny_policy() def allow_policy(method_arn: str) -> dict: return { "principalId": "apigateway.amazonaws.com", "policyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": "execute-api:Invoke", "Effect": "Allow", "Resource": method_arn } ] } } def deny_policy() -> dict: return { "principalId": "*", "policyDocument": { "Version": "2012-10-17", "Statement": [ { "Action": "*", "Effect": "Deny", "Resource": "*" } ] } }

Similar to the /authenticate endpoint, the result of jwt.decode() determines the output of the AWS Lambda function. However, instead of returning a boolean to determine whether a JWT is valid, the authorizer returns a Python dictionary. This dictionary represents a JSON IAM policy that is used to invoke API Gateway endpoints. If the JWT is valid, the IAM policy defined in allow_policy() is returned. If the JWT is invalid, the IAM policy defined in deny_policy() is returned. The policy in allow_policy() allows access to API endpoints while the policy in deny_policy() denies access.

These four functions fulfill all the authentication needs of my application! Potential enhancements to this approach include making the JWT tokens OAuth2 or OpenID Connect protocol compliant3.

Using AWS API Gateway and AWS Lambda is an effective way to create a basic authentication API for an application. The main drawback of this approach is AWS Lambda cold starts and the expenses involved with mitigating them. Otherwise, its a great way to separate authentication logic into its own serverless microservice. All the code discussed in this article is available in my saints-xctf-auth repository.