AWS Secrets Manager automatic rotation is a criminally underused feature.
It has built-in integrations with other AWS services such as RDS for zero-effort RDS credentials rotation.
However, did you know you can build a custom key rotation lambda to automatically rotate anything?!
Let’s build a custom AWS Secrets Manager key rotation Lambda for IAM User Access Keys!
Custom rotation is a feature in Secrets Manager that allows you to use your own code to rotate secrets. Custom rotation gives you more flexibility and control over the rotation process, allowing you to integrate with your existing infrastructure and workflows.
Rotating IAM user access keys is essential for maintaining the security of your AWS infrastructure. Access keys can be compromised if they are shared or stored insecurely, and rotating them regularly helps reduce the risk of unauthorized access. Additionally, rotating access keys can help comply with security regulations and audit requirements.
At the centre is the single Lambda which handles the complete secret rotation.
If the rotation is a success - SES will send an email to the user.
If the rotation is a failure and the users Access Key remains the same - SNS will email the administrator.
If the rotation is a partial success where the users Access Key is changed, but the old key was not deleted, or some other downstream failure occurs - Both the user gets and email from SES, and the admin receives an email from SNS.
Everything is built in AWS CDK and available as an open-source project on GitHub.
Secrets Manager rotation has 4 stages:
Yes that’s right, if you ever update a secret version, you can always get it again by accessing ‘AWSPREVIOUS’ version of your secret! Just don’t overwrite it twice!
Let’s start easy and get more complicated.
To notify the administrator on a problem, we simply need to create a SNS topic, and add the administrator email as a subscription to that.
Nice and simple.
topic = sns.Topic(
self,
"FailureTopic",
topic_name="access-key-rotation-notification"
)
sns.Subscription(
self,
"admin",
topic=topic,
protocol=sns.SubscriptionProtocol.EMAIL,
endpoint=email_source
)
Next we need to validate the email identities we will be sending and receiving mail from.
In my example, i’m sending emails from the variable ’email_source’, and my work email is the user in this case.
To scale this project to 5 users with automatic rotation, you would need to validate all 5 user emails, and then the source email too.
To scale this project to 50,000 users, that’s 100% possible, but i highly recommend you contact AWS Support to move your SES out of ‘sandbox mode’ and into ‘production’ mode.
ses.EmailIdentity(
self,
"UserIdentity",
identity=ses.Identity.email("jeremy.ritchie@caylent.com"),
)
ses.EmailIdentity(
self,
"EmailSource",
identity=ses.Identity.email(email_source),
)
In sandbox mode, you’re highly limited on how many emails you can send a day, and per second. Also, and most importantly, every email you send and receive from needs to be validated in SES as we did above. For 50,000 users, or even 20, that’s a bit too many validation emails.
In Production mode, your limit on emails per day and second are dramatically lifted, plus you can send to any email, without the recipient being validated! This means you would only need to validate a single email, and that’s the one you’re sending from.
Next we need to create the rotation lambda in CDK. I’ll dive into the actual lambda code futher down.
This is a relatively simple lambda resource, with straightforward permissions. In a production environment i suggest doing further refinement in line with the principle of lease privilege. For me however, this is intended as a open-source resource for anyone to use, thus the IAM Role permissions are more open to simpler deployment for beginners.
lambda_role = iam.Role(
self,
id="lambdaRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
role_name="access-key-rotator-role",
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSLambdaBasicExecutionRole"
),
iam.ManagedPolicy.from_aws_managed_policy_name(
"SecretsManagerReadWrite"
),
iam.ManagedPolicy.from_aws_managed_policy_name(
"IAMFullAccess"
)
],
inline_policies={
"SES": iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=["ses:SendEmail"],
effect=iam.Effect.ALLOW,
resources = ["*"]
)
]
),
"SNS": iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=["sns:Publish"],
effect=iam.Effect.ALLOW,
resources = [topic.topic_arn]
)
]
)
}
)
function = lambda_.Function(
self,
"lambda",
runtime=lambda_.Runtime.PYTHON_3_9,
function_name="access-key-rotator",
code=lambda_.Code.from_asset("./lambda"),
handler="lambda_function.lambda_handler",
role=lambda_role,
environment={
"sns_topic_arn": topic.topic_arn,
"source_email": email_source,
"email_domain": domain
},
timeout=Duration.seconds(30)
)
function.add_permission(
"SecretsManagerPolicy",
principal=iam.ServicePrincipal("secretsmanager.amazonaws.com"),
)
SecretsManagerPolicy
is necessary to ensure secrets manager has permissions to invoke the lambda when it rotates!
Last CDK resource is the secret, or secrets.
As previously mentioned, my example is using a single user, however i’ve made it simple and clear how you can expand this to many users.
The secret values of foo, bar do not need to be replaced. The are simply there to create the keys within the secret:
The value of the keys is irrelevant as it will immediately rotate the moment it has been deployed. It will then rotate again in 90 days, unless you modify the duration, or manually trigger it via the Console, SDK, or CLI.
users = ['jeremy.ritchie']
# Secrets
for user in users:
secret = secretsmanager.Secret(
self,
f"{user.replace('.','')}Secret",
secret_name=f"/access-key/{user}",
secret_object_value={
"access_key_id": SecretValue.unsafe_plain_text("foo"),
"secret_access_key": SecretValue.unsafe_plain_text("bar"),
}
)
secret.add_rotation_schedule(
f"{user.replace('.','')}Rotation",
automatically_after=Duration.days(90),
rotation_lambda=function
)
Last but not least is the Lambda function.
It’s quite large as it needs to handle 4 unique states of secret rotation, performing various actions at each stage. It also needs to be able to safely handle errors and fix them if possible, before resorting to reporting it before stopping.
The function expects IAM users to match their email:
Or
def create_key(username):
try:
access_key_metadata = iam_client.create_access_key(UserName=username)
access_key = access_key_metadata['AccessKey']['AccessKeyId']
secret_key = access_key_metadata['AccessKey']['SecretAccessKey']
logging.info(access_key + " has been created.")
return access_key, secret_key
except ClientError as e:
if e.response['Error']['Code'] == 'LimitExceededException':
logging.error("Too many Access Key Tokens!!")
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key: {username} - {str(e)}")
raise e
else:
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - creating new key: {username} - {str(e)}")
raise e
def add_secret_version(secret_id, token, access_key, secret_key):
secret = json.dumps({"access_key_id":access_key,"secret_access_key":secret_key})
try:
resp = secrets_client.put_secret_value(SecretId=secret_id,
ClientRequestToken=token,
SecretString=secret,
VersionStages=['AWSPENDING',])
logging.debug(resp)
except secrets_client.exceptions.ResourceExistsException as e:
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - adding secert version - {str(e)}")
raise e
def test_secret(secret_id, token, username):
resp = secrets_client.get_secret_value(
SecretId=secret_id,
VersionId=token,
VersionStage='AWSPENDING'
)
access_key_id = json.loads(resp['SecretString'])['access_key_id']
secret_access_key = json.loads(resp['SecretString'])['secret_access_key']
try:
import time
time.sleep(10) # Allow time for key to activate (2x factor of safety)
boto3.client('iam', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key).list_access_keys(UserName=username)
logging.info("tests passed")
except Exception as e:
print("Error: " + str(e))
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - error testing key - {username} - {secret_id} - {str(e)}")
raise e
def rotate_secret_version(secret_id, token):
current_secret_versions = secrets_client.list_secret_version_ids(SecretId=secret_id)['Versions']
for i in current_secret_versions:
if i['VersionStages'][0] == 'AWSCURRENT':
previous_secret_version = i['VersionId']
secrets_client.update_secret_version_stage(
SecretId=secret_id,
VersionStage='AWSCURRENT',
RemoveFromVersionId=previous_secret_version,
MoveToVersionId=token
)
logging.info("Success in rotating")
return
logging.error("failure in rotating")
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - {secret_id}")
raise ClientError
def revoke_old_access_keys(secret_id, token, username):
secret_versions = secrets_client.list_secret_version_ids(SecretId=secret_id)
for version in secret_versions['Versions']:
if version['VersionStages'][0] == 'AWSPREVIOUS':
current_version = version['VersionId']
resp = secrets_client.get_secret_value(
SecretId = secret_id,
VersionId = current_version
)
access_key_id = json.loads(resp['SecretString'])['access_key_id']
if len(access_key_id) > 16: # Check if key has any value to it. i.e this is not a fresh secret
disable_key(access_key=access_key_id, username=username)
delete_key(access_key=access_key_id, username=username)
return
def disable_key(access_key, username):
try:
iam_client.update_access_key(UserName=username, AccessKeyId=access_key, Status="Inactive")
logging.info(access_key + " has been disabled.")
except Exception as e:
logging.error(f"Error disabling key {access_key} - continuing")
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - error disabling key - {username} - {access_key} - {str(e)}")
def delete_key(access_key, username):
try:
iam_client.delete_access_key(UserName=username, AccessKeyId=access_key)
logging.info(access_key + " has been deleted.")
except Exception as e:
logging.error(f"Error deleting key {access_key} - continuing")
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - error deleting key - {username} - {access_key} - {str(e)}")
def send_email(username, domain):
try:
dest_address = username + domain
link = f'https://console.aws.amazon.com/secretsmanager/home'
ses_client.send_email(Source=os.environ['source_email'],
Destination={
'ToAddresses': [
dest_address,
]
},
Message={
'Subject': {
'Data': 'AWS Access Key Rotation',
'Charset': 'UTF-8'
},
'Body': {
'Html': {
'Data': BODY.format(username, dest_address, link),
'Charset': 'UTF-8'
}
}
})
logging.info("Email sucessfully sent!")
except Exception as e:
logging.error("Email not sent sucessfully.")
logging.error(str(e))
sns_client.publish(TopicArn=os.environ['sns_topic_arn'], Message=f"Error rotating key - error sending email - {username} - {str(e)}")
def check_current_secret(user, secret_id, secret_stage):
secret = secrets_client.get_secret_value(SecretId=secret_id)
secret_versions = secrets_client.list_secret_version_ids(SecretId=secret_id)['Versions']
access_key_id = json.loads(secret['SecretString'])['access_key_id']
user_keys = iam_client.list_access_keys(UserName=user)['AccessKeyMetadata']
if secret_stage == 'createSecret':
# Check number of secret versions
if len(secret_versions) == 3: # This secret has been previous interrupted - check if iam matches secrets manager pending.
for secret in secret_versions:
if secret['VersionStages'][0] == 'AWSPENDING':
pending_access_key = secrets_client.get_secret_value(SecretId=secret_id, VersionStage='AWSPENDING')
for current_iam_key in user_keys:
if current_iam_key['AccessKeyId'] == json.loads(pending_access_key['SecretString'])['access_key_id']:
logging.info('Access key and pending secret match... skip createSecret stage')
return False
logging.error('Access key and pending secret DO NOT match...')
raise RuntimeError('Access key and pending secret DO NOT match...')
elif len(secret_versions) <= 2: # Normal operation
# Check number of access keys on user ~~~~~~~~~~~~
if len(user_keys) == 1:
logging.info("User has 1 access key currently")
elif len(user_keys) == 2:
# Delete key that is not in the secret
logging.info("User has 2 access key currently - deleting any not in the current secret")
for key in user_keys:
if key['AccessKeyId'] != access_key_id:
delete_key(access_key=key['AccessKeyId'], username=user)
return True
def lambda_handler(event, context):
domain = os.environ["email_domain"]
secret_id = event['SecretId']
secret_stage = event['Step']
token = event['ClientRequestToken']
username = secret_id.split('/')[-1].split('-')[0] # /access-key/jeremy.ritchie-823234
logging.info(f"Stage: {secret_stage}, username: {username}, Token: {token}")
if check_current_secret(username, secret_id, secret_stage):
if secret_stage == 'createSecret':
access_key, secret_key = create_key(username=username)
add_secret_version(secret_id, token, access_key, secret_key)
elif secret_stage == 'setSecret':
pass
elif secret_stage == 'testSecret':
test_secret(secret_id, token, username)
elif secret_stage == 'finishSecret':
rotate_secret_version(secret_id, token)
revoke_old_access_keys(secret_id, token, username)
send_email(username, domain)
return {
'statusCode': 200,
'body': 'success'
}
Upon deployment a secret is created and the rotation is immediately triggered. Secret rotation will always trigger right away, and then again when the time specified (90 days for me).
Let’s check out the Lambda logs to see what’s going on!
Fantastic!
We can see the Lambda moving through the various stages of the rotation, logging the relevant information at each one.
And finally the successful result!
Well, that’s been a lot of coding for a single blog post, but i think the result is worth it.
Here is a portable solution that will automatically rotate IAM User Access Keys. I hope you find this useful!
Once again, you can find all the source code for this on my GitHub.
Cheers!