Automatically rotate IAM user access keys
Access keys are long-term credentials for an AWS Identity and Access Management (IAM) user or the Amazon Web Services (AWS) account root user. Regularly rotating your IAM credentials helps prevent a compromised set of IAM access keys from accessing components in your AWS account. Rotating IAM credentials is also an important part of security best practices in IAM.
Our setup will check the status of access-keys for IAM users and do the following:
- New IAM access keys are generated when existing access keys are 180 days old.
- The new access keys are stored as a secret in AWS Secrets Manager. A resource-based policy only allows the specified IAM principle to access and retrieve the secret.
- The account owner of the new access keys receives a notification via e-mail.
- The previous access keys are deactivated at 210 days old.
- The previous access keys are then deleted after 220 days.
Note: the days defined can be changed
AWS Lambda-function and Amazon EventBridge perform these actions automatically. An user can then retrieve the new pair of access keys and replace them in their code or programs.
Technology stack:
- Amazon DynamoDB
- S3
- EventBridge
- IAM
- Lambda
- Secrets Manager
- Amazon SES
Setup Process - CloudFormation:
-
In the administrator account,
- We create a IAM-role called AWSCloudFormationStackSetAdministrationRole
- with the following policy:
AWSCloudFormationStackSetAdministrationPolicy
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "sts:AssumeRole" ], "Resource": [ "arn:aws:iam::*:role/AWSCloudFormationStackSetExecutionRole" ], "Effect": "Allow" } ] }
- which has:
Trust relationship
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "cloudformation.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
- with the following policy:
- And we create a service role called AWSCloudFormationStackSetExecutionRole that trusts the administrator account
- with following policy:
AWSCloudFormationStackSetExecutionPolicy
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sns:*", "lambda:CreateFunction", "iam:GetRole", "s3:*", "lambda:GetFunction", "iam:CreateRole", "iam:DeleteRole", "cloudformation:*", "dynamodb:*", "iam:AttachRolePolicy", "iam:PutRolePolicy", "iam:DetachRolePolicy", "iam:DeleteRolePolicy", "lambda:DeleteFunction", "iam:PassRole", "lambda:AddPermission", "events:PutRule", "events:DescribeRule", "lambda:RemovePermission", "events:RemoveTargets", "events:PutTargets", "events:DeleteRule" ], "Resource": "*" } ] }
- which has:
Trust relationship
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::ACCOUNT_ID:root" }, "Action": "sts:AssumeRole" } ] }
- with following policy:
- We create a IAM-role called AWSCloudFormationStackSetAdministrationRole
-
Go to the SES console and check if you have a sender address verified in the account (follow instruction that found here).
- We will move the account outside the sandbox so that it can send to and from the email addresses outside its own domain.
- This is to confirm that the lambda function can send email to users with a different email domain address outside the controlled domain
-
Create a local folder with the following files:
main.py
""" Main script Automatically rotates IAM access keys in for the AWS users __author__ = "Samiul Saki Chowdhury" """ import botocore import boto3 import datetime import os import base64 from botocore.exceptions import ClientError from notifier import * # Inputs from Environment Variables # Global Variables #sns_arn = os.environ["sns_arn"] # Environment Variable: RotationPeriod # The number of days after which a key should be rotated rotationPeriod = int(os.environ['RotationPeriod']) # Environment Variable: InactivePeriod # The number of days after which to inactivate keys that had been rotated # Note: This must be greater than RotationPeriod oldKeyInactivationPeriod = int(os.environ['InactivePeriod']) # Environment Variable: RetentionPeriod # The number of days after which to delete keys that have been rotated and inactivated # Note: This must be greater than InactivePeriod oldKeyDeletionPeriod = int(os.environ['RetentionPeriod']) # Environment Variable: TagKey # The TagKey that will define user only with email addresses, evt. send the secret keys # Note: This must match the keys that are defined as TagKey name tagKeyName = str(os.environ['TagKey']) # Environment Variable: UserType # The UserType that will define which user (Person) need to be check, evt. rotate the keys # Note: This must match the keys that are defined as UserType type userType = str(os.environ['UserType']) # Pre-calculate the rotation and retention cutoff dates rotationDate = (datetime.datetime.now() - datetime.timedelta(days=rotationPeriod)).date() inactivationDate = (datetime.datetime.now() - datetime.timedelta(days=oldKeyInactivationPeriod)).date() deletionDate = (datetime.datetime.now() - datetime.timedelta(days=oldKeyDeletionPeriod)).date() # Counting number of days between defined periods days_to_inactivate = oldKeyInactivationPeriod - rotationPeriod #ex: (210-180 = 30) days_to_delete = oldKeyDeletionPeriod - rotationPeriod # ex: (220-180 = 40) # Format for lines in credentials.txt akidLineFormat = 'aws_access_key_id = {}' secretLineFormat = 'aws_secret_access_key = {}' # Format for name of ASM secrets secretNameFormat = 'User_{}_AccessKey' # Format for ARN of ASM secrets secretArnFormat = 'arn:aws:secretsmanager:{region_name}:{account_id}:secret:{secret_name}' # Format for Secret Manager Resource Policy secretPolicyFormat = """{{ "Version": "2012-10-17", "Statement": [ {{ "Effect": "Allow", "Principal": {{ "AWS": "arn:aws:iam::{account_id}:user/{user_name}" }}, "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds", "secretsmanager:ListSecrets" ], "Resource": "*" }} ] }} """ iamPolicyFormat = """{{ "Version": "2012-10-17", "Statement": [ {{ "Sid": "RetrieveSecretValue", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": "{secret_arn}" }}, {{ "Sid": "ListSecret", "Effect": "Allow", "Action": "secretsmanager:ListSecrets", "Resource": "*" }} ] }} """ # IAM Client iam = boto3.client('iam') # Secrets Manager Client sm = boto3.client('secretsmanager') my_session = boto3.session.Session() my_region = my_session.region_name # Sub-Clients #sns_client = boto3.client('sns') sts_client = boto3.client('sts') def lambda_handler(event, context): users = iam.list_users() response = {} force_rotate_user_name = None if "ForceRotate" in event: force_rotate_user_name = event['ForceRotate'] for user in users['Users']: user_name = user['UserName'] user_tag = iam.list_user_tags(UserName=user_name) # Here we will define if the UserType is an User for i in user_tag.get("Tags"): if i.get("Key") == "UserType": user_type = i.get("Value") user_email = "" for j in user_tag.get("Tags"): if j.get("Key") == tagKeyName: user_email = j.get("Value") if user_type == userType and user_email != "": if user_name == force_rotate_user_name: process_user(user, user_email, response, True) else: process_user(user, user_email, response, False) # Build a response for debugging - doesn't change actual work done, just gives output for testing in Lambda console #response = build_response(results) response['RotationDate'] = rotationDate.__str__() return response def get_account_id(): """ Gets the current account ID.# :return The current account Id of the Lambda function. """ try: account_id = sts_client.get_caller_identity().get("Account") except ClientError as error: print(error) return account_id def process_user(user, user_email, response, force=False): """Rotate access keys for a user. Inactive keys will be deleted Users with no active access keys will not be processed Users with an access key older than the rotation date will have a new key created and stored in ASM, deleting the oldest key if necessary. Users with an active access key older than the inactivation date, and an active access key newer than the rotation date will have the oldest key inactivated. Users with an inactive access key older than the deletion period, and an active access key newer than the rotation date will have the oldest key deleted On a single run of this lambda, a key will only move from active to inactive or inactive to deleted. """ user_name = user['UserName'] response[user_name] = {} lak = iam.list_access_keys(UserName=user_name) parsed_arn = parse_arn(user['Arn']) account = parsed_arn['account'] num_keys = 0 # Active Keys active_keys = [] # Inactive Keys inactive_keys = [] # Oldest Key oldest_key = None # Classify all access keys for the current user for akm in lak['AccessKeyMetadata']: num_keys += 1 if oldest_key is None or oldest_key['CreateDate'] > akm['CreateDate']: oldest_key = akm if akm['Status'] == 'Active': active_keys.append(akm) else: inactive_keys.append(akm) num_active = len(active_keys) num_inactive = len(inactive_keys) if force: # Rotation of user is forced for testing if num_active == 2: # Two active keys. Delete oldest and rotate key_to_delete = oldest_key['AccessKeyId'] iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) #print("Force: delete key") send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) response[user_name]["Deleted Old Key"] = key_to_delete create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." else: #print("Force: Create key") create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 2: classification_1 = classify_date(active_keys[0]) classification_2 = classify_date(active_keys[1]) # Two Active Keys if classification_1 == "New" or classification_2 == "New": # At least one key is new. Handle oldest one according to inactivation/deletion dates handle_oldest_key(user_name, account, user_email, response, oldest_key) else: # Both keys older than rotation date. Delete oldest and create new key_to_delete = oldest_key['AccessKeyId'] #print("Oldest key deleted") iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) response[user_name]["Deleted Old Key"] = key_to_delete create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 1 and num_inactive == 1: # One active and one inactive. Handle inactive key according to inactivation/deletion dates handle_oldest_key(user_name, account, user_email, response, inactive_keys[0]) elif num_active == 1 and num_inactive == 0: # Single key that is active. Rotate if necessary. classification = classify_date(active_keys[0]) if classification == "New": response[user_name]["Action"] = "No key rotation required." else: create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 0 and num_inactive > 0: # If no active keys, delete all inactive keys response[user_name]["Deleted Inactive Keys"] = [] for key_to_delete in inactive_keys: iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) #print("Deleting one key") response[user_name]["Deleted Inactive Keys"].append(key_to_delete) def classify_date(akm): creation_date = akm['CreateDate'].date() if creation_date > rotationDate: return "New" if creation_date > inactivationDate: return "Rotate" if creation_date > deletionDate: return "Inactivate" return "Delete" def handle_oldest_key(user_name, account, user_email, response, oldest_key): classification = classify_date(oldest_key) if classification == "Inactivate": key_to_inactivate = oldest_key['AccessKeyId'] iam.update_access_key(UserName=user_name, AccessKeyId=key_to_inactivate, Status='Inactive') send_notification_update_key(user_name, key_to_inactivate, account, "deaktivert", user_email) #print("Key deactivated") response[user_name]["Inactivated Old Key"] = key_to_inactivate elif classification == "Delete": key_to_delete = oldest_key['AccessKeyId'] iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "slettet", user_email) #print("Key deleted") response[user_name]["Deleted Old Key"] = key_to_delete def format_secret_arn(secret_name): account_id = get_account_id() return secretArnFormat.format(account_id = account_id, secret_name = secret_name, region_name = my_region) def format_secret_policy(user): account_id = get_account_id() user_name = user['UserName'] return secretPolicyFormat.format(account_id = account_id, user_name = user_name) def format_iam_policy(secret_arn): account_id = get_account_id() return iamPolicyFormat.format(account_id = account_id, secret_arn = secret_arn) def create_resource_policy(user, secret_name, secret_arn): resource_policy_document = format_secret_policy(user) sm.put_resource_policy(SecretId=secret_name, ResourcePolicy=resource_policy_document, BlockPublicPolicy=True) def create_iam_policy_if_not_exist(user, secret_arn): user_name = user['UserName'] create_policy = False policy_name = 'SecretsAccessPolicy' try: iam.get_user_policy(UserName = user_name, PolicyName = policy_name) except botocore.exceptions.ClientError as e: # TODO - IAM uses IAM.Client.exceptions.NoSuchEntityException # Find out if it inherits from ClientError. If it does, this code is probably ok, # but may need to change ResourceNotFoundException to NoSucheEntityException if e.response['Error']['Code'] == 'NoSuchEntity': create_policy = True else: raise e # Go Bonk if create_policy: policy_document = format_iam_policy(secret_arn) iam.put_user_policy(UserName = user_name, PolicyName = policy_name, PolicyDocument = policy_document) def create_access_key(user, user_email, response): user_name = user['UserName'] secret_name = secretNameFormat.format(user_name) secret_arn = format_secret_arn(secret_name) # Create new access key new_access_key = iam.create_access_key(UserName=user_name) response[user_name]["Created Access Key"] = new_access_key['AccessKey']['AccessKeyId'] response[user_name]["ASM Secret Name"] = secret_name akid_line = akidLineFormat.format( new_access_key['AccessKey']['AccessKeyId']) secret_line = secretLineFormat.format( new_access_key['AccessKey']['SecretAccessKey']) cred_file_body = '{}\n{}'.format(akid_line, secret_line) # Create new secret, or store in existing create_secret = False try: # See if the secret we need already exists sm.describe_secret(SecretId=secret_name) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': create_secret = True else: raise e # Go Bonk if create_secret: sm.create_secret( Name=secret_name, Description='Auto-created secret', SecretString=cred_file_body) else: sm.put_secret_value(SecretId=secret_name, SecretString=cred_file_body) secret_value = sm.get_secret_value(SecretId=secret_name) parsed_arn = parse_arn(secret_value['ARN']) #print("Parsing with notifier:", parsed_arn) send_notification_create_key(user_name, parsed_arn['account'], new_access_key['AccessKey']['AccessKeyId'], secret_value['ARN'], rotationPeriod, days_to_inactivate, days_to_delete, user_email) create_resource_policy(user, secret_name, secret_arn) create_iam_policy_if_not_exist(user, secret_arn)
notifier.py
""" Notifier script Provies core logic for the Notifier Lambda Function. __author__ = "Samiul Saki Chowdhury" """ import datetime import logging import botocore import boto3 import json import os from botocore.exceptions import ClientError SES_SENDER_EMAIL_ADDRESS = str(os.environ['AdminEmail']) #'noreply-dev@bymoslo.net' now = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M")) # The character encoding for the email. CHARSET = "UTF-8" def parse_arn(arn): # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html elements = arn.split(':', 5) result = { 'arn': elements[0], 'partition': elements[1], 'service': elements[2], 'region': elements[3], 'account': elements[4], 'resource': elements[5], 'resource_type': None } if '/' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split('/', 1) # NOQA elif ':' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split(':', 1) # NOQA return result def send_notification_create_key(user_name, account_id, access_key, secret_value, rotationPeriod, days_to_inactivate, days_to_delete, user_email): ses_client = boto3.client('ses') # The HTML body of the email. BODY_HTML = """<html> <head></head> <body> <div> <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3> </div> <p>Hei {user_name},</p> <p>Din tilgangsnøkkel for konto:: <strong>{account_id}</strong> er over {rotationPeriod} dager gammel. Vi har nå opprettet en ny tilgangsnøkkel for deg.</p> <p>Vennligst bruk følgende informasjon for å hente den nye tilgangsnøkkelen fra Secrets Manager.</p> <table> <tbody> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td> <td style="width: 40%; height: 18px;">{user_name}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td> <td style="width: 40%; height: 18px;">{account_id}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td> <td style="width: 40%; height: 18px;">{access_key}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>SecretARN:</strong></td> <td style="width: 40%; height: 18px;">{secret_value}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Status:</strong></td> <td style="width: 40%; height: 18px;">OPPRETTET</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Dato Opprettet:</strong></td> <td style="width: 40%; height: 18px;">{now}</td> </tr> </tbody> </table> <p>Din gamle tilgangsnøkkel er fortsatt aktiv og kan brukes som normalt. Den vil bli deaktivert automatisk om {days_to_inactivate} dager og evt. slettet om {days_to_delete} dager. Vennligst last ned tilgangsnøkkel id (Access Key ID) og den hemmelige nøkkelen (Secret Key), og oppdater AWS-profilen din i lokal maskin/applikasjon før de blir slettet.</p> <p><em>Du kan kjøre følgende kommando med CLI for å hente din nye hemmelige nøkkel (Secret Key):</em><br/> <code># aws secretsmanager get-secret-value --secret-id {secret_value} --output table [--profile Your_Profile]</code></p> <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/> Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p> <p>Med vennlig hilsen<br/> AWS Infrastrukturteam<br> Digital Teknologi og Innsikt</p> </body> </html> """.format(user_name=user_name, account_id=account_id, access_key=access_key, secret_value=secret_value, rotationPeriod=rotationPeriod, now=now, days_to_inactivate=days_to_inactivate, days_to_delete=days_to_delete) try: ses_response = ses_client.send_email( Destination={'ToAddresses': [user_email]}, Message={ 'Body': { 'Html': { 'Charset': CHARSET, 'Data': BODY_HTML, } }, 'Subject': {'Charset': 'UTF-8', 'Data': f'Din nye AWS-tilgangsnøkkel ble opprettet for konto {account_id}'} }, Source=SES_SENDER_EMAIL_ADDRESS ) except ClientError as e: logging.error(e.response['Error']['Message']) print("Email sending unsuccessfull") else: logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}') print("Email sent successfully") def send_notification_update_key(user_name, access_key, account_id, status, user_email): ses_client = boto3.client('ses') BODY_HTML = """ <html> <head></head> <body> <div> <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3> </div> <p>Hei {user_name},</p> <p>Din tilgangsnøkkel-ID {access_key} for konto: <strong>{account_id}</strong> er nå <strong>{status}</strong>. Bruk nøkkelen som ble opperettet for deg tidligere, eller kontakt AWS infrastrukturteamet for å få hjelp.</p> <table> <tbody> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td> <td style="width: 40%; height: 18px;">{user_name}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td> <td style="width: 40%; height: 18px;">{account_id}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td> <td style="width: 40%; height: 18px;">{access_key}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Status:</strong></td> <td style="width: 40%; height: 18px;">{status}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Dato {status}:</strong></td> <td style="width: 40%; height: 18px;">{now}</td> </tr> </tbody> </table> <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/> Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p> <p>Med vennlig hilsen<br/> AWS Infrastrukturteam<br> Digital Teknologi og Innsikt</p> </body> </html> """.format(user_name=user_name, account_id=account_id, access_key=access_key, status=status.upper(), now=now) try: ses_response = ses_client.send_email( Destination={'ToAddresses': [user_email]}, Message={ 'Body': { 'Html': { 'Charset': CHARSET, 'Data': BODY_HTML, } }, 'Subject': {'Charset': CHARSET, 'Data': f'Din gamle AWS-tilgangsnøkkel er nå {status} for konto {account_id}'} }, Source=SES_SENDER_EMAIL_ADDRESS ) except ClientError as e: logging.error(e.response['Error']['Message']) print("Email sending unsuccessfull") else: logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}') print("Email sent successfully")
Note: The BODY_HTML variables for the email-templates can be anything and change theem as you prefer
- Zip these two files together and call it access_key_auto_rotation.zip
Note: This zip file will be defined on the CloudFormation script so this need to be noted down.
-
Now create a CloudFomation script on the same local folder:
iam-key-auto-rotation-and-notifier.yaml
# CloudFormation script to create lambda function to # rotate IAM access keys for AWS users # # Copyright = (c) 2020 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content # is provided subject to the terms of the AWS Customer Agreement available at # https://aws.amazon.com/agreement/ or other written agreement between Customer # and Amazon Web Services, Inc. # Version = "1.0" AWSTemplateFormatVersion: '2010-09-09' Description: "AWS CloudFormation template to set up Auto-rotation function for AWS IAM Access Keys." Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Configure the Deployment Parameters: - S3BucketName - S3BucketPrefix - Label: default: Configure Notifier Tool Parameters: - AdminEmailAddress - RotationPeriodDays - InactivePeriodDays - RetentionPeriodDays ParameterLabels: # Deployment Configuration S3BucketName: default: CloudFormation S3 Bucket Name S3BucketPrefix: default: CloudFormation S3 Bucket Prefix # Rotation Period Settings RotationPeriodDays: default: Lambda Function Rotation Period InactivePeriodDays: default: Lambda Function Inactivation Period RetentionPeriodDays: default: Lambda Function Retention Period # Notifier Settings AdminEmailAddress: default: Admin Email Address Parameters: S3BucketName: Description: S3 Bucket Name where code is located. Type: String Default: "iam-rotate-keys" S3BucketPrefix: Description: The prefix or directory where resources will be stored. Type: String Default: "iam-rotation" AdminEmailAddress: Description: Email address that will be used in the "sent from" section of the email Type: String RotationPeriodDays: Description: Rotation period that will be used to check if the old keys are older than certain days Type: String Default: "180" InactivePeriodDays: Description: Inactivation period that will be used to keep the old keys inactive for certain days Type: String Default: "210" RetentionPeriodDays: Description: Retention period that will be used to delete the old inactive keys after certain days Type: String Default: "220" Resources: # IAM KEY ROTATION RESOURCES RotationLambdaFunctionExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Sid: AllowExecutionPermissionsOnFunction Effect: Allow Principal: Service: - lambda.amazonaws.com - events.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: AllowRotationFunctionPermissions PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:GetObject Resource: - !Sub "arn:${AWS::Partition}:s3:::${S3BucketName}/*" - Effect: Allow Action: - iam:List* - iam:CreatePolicy - iam:CreateAccessKey - iam:DeleteAccessKey - iam:UpdateAccessKey - iam:PutUserPolicy - iam:GetUserPolicy Resource: "*" - Effect: Allow Action: - iam:AttachUserPolicy Resource: - !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:user/*" - Effect: Allow Action: - secretsmanager:PutResourcePolicy - secretsmanager:PutSecretValue - secretsmanager:DescribeSecret - secretsmanager:CreateSecret - secretsmanager:GetResourcePolicy - secretsmanager:GetSecretValue Resource: - !Sub "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:*" - Effect: Allow Action: - ses:SendEmail Resource: - !Sub "arn:${AWS::Partition}:ses:${AWS::Region}:${AWS::AccountId}:identity/*" RotationAccessKeyRotateLambdaFunction: Type: AWS::Lambda::Function Properties: Description: Rotates IAM Access Keys on specified schedule FunctionName: IAM-Access-Key-Rotation-LambdaFunction Handler: main.lambda_handler Runtime: python3.8 Role: !GetAtt RotationLambdaFunctionExecutionRole.Arn Timeout: 240 Environment: Variables: RotationPeriod: !Ref RotationPeriodDays InactivePeriod: !Ref InactivePeriodDays RetentionPeriod: !Ref RetentionPeriodDays TagKey: Email UserType: User AdminEmail: !Ref AdminEmailAddress Code: S3Bucket: !Ref S3BucketName S3Key: !Sub ${S3BucketPrefix}/access_key_auto_rotation.zip RotationCloudWatchEventLambdaTrigger: Type: AWS::Events::Rule DependsOn: - RotationLambdaFunctionExecutionRole Properties: Description: CloudWatch Event to trigger Access Key auto-rotation Lambda Function daily ScheduleExpression: rate(24 hours) State: ENABLED Targets: - Arn: !GetAtt RotationAccessKeyRotateLambdaFunction.Arn Id: AccessKeyRotationFunction
-
We need to upload the files in an S3 bucket:
-
Go to the S3 console
- Create bucket
- Bucket name (your choice)
- AWS region (EU Ireland: eu-west-1)
- Select the default for the rest of the options
- Create bucket
-
Open the bucket and upload the entire local folder (the zip file access_key_auto_rotation.zip no need to upload individual python script)
- Feel free to use cli or console
-
Create a folder with names in your S3 bucket named iam-rotation:
IMPORTANT that you use this name for the folder. It will be your prefix for the CloudFormation script.
-
Move all the files to the iam-rotation folder.
-
Roll out the entire stack with CloudFormation:
- Change variables in the CloudFomation yaml as you prefer.
- Go to the CloudFormation console
- Select StackSets (make sure you are in the same region as the S3 bucket you created)
- Create StackSets
- Specify template
- Upload a template file
- Select the iam-key-auto-rotation-and-notifier.yaml file from your local PC (or you can upload it via the S3 url to the file in your S3 bucket)
- Next
- StackSet name: SecurityFindingsRemediationStackSet (or your choose)
- CloudFormation S3 Bucket Name: Your bucket name
- CloudFormation S3 Bucket Prefix: iam-rotation (make sure it is the same as the name (prefix) of the folder)
- Admin Email Address: It is important that there is a verified email address (example: noreply-dev@bymoslo.net)
- Next
- Select IAM role name from dropdown list
- Select the administration role you created earlier: AWSCloudFormationStackSetAdministrationRole
- Select the IAM execution role you created early: AWSCloudFormationStackSetExecutionRole
- Tags - Optional
- Next
- Deploy locations: Best to select Deploy stacks in accounts and fill in Account numbers (separated by commas for several offices)
- Specify regions: Select where the S3 bucket was created
- Deployment option: Let them be the default
- Next
- Approved CF stackset creation
- Submit
-
Automation is now rolled out using CloudFormation
After stack was created:
- Go to stacks and select the new stack set:
- You can follow the events on the Events tab
- Go to the Resources tab when installation is complete. You will find all Physical IDs for the stackset with more information.
Setup Process - Terraform:
-
Same as before create a local folder with the following files:
main.py
""" Main script Automatically rotates IAM access keys in for the AWS users __author__ = "Samiul Saki Chowdhury" """ import botocore import boto3 import datetime import os import base64 from botocore.exceptions import ClientError from notifier import * # Inputs from Environment Variables # Global Variables #sns_arn = os.environ["sns_arn"] # Environment Variable: RotationPeriod # The number of days after which a key should be rotated rotationPeriod = int(os.environ['RotationPeriod']) # Environment Variable: InactivePeriod # The number of days after which to inactivate keys that had been rotated # Note: This must be greater than RotationPeriod oldKeyInactivationPeriod = int(os.environ['InactivePeriod']) # Environment Variable: RetentionPeriod # The number of days after which to delete keys that have been rotated and inactivated # Note: This must be greater than InactivePeriod oldKeyDeletionPeriod = int(os.environ['RetentionPeriod']) # Environment Variable: TagKey # The TagKey that will define user only with email addresses, evt. send the secret keys # Note: This must match the keys that are defined as TagKey name tagKeyName = str(os.environ['TagKey']) # Environment Variable: UserType # The UserType that will define which user (Person) need to be check, evt. rotate the keys # Note: This must match the keys that are defined as UserType type userType = str(os.environ['UserType']) # Pre-calculate the rotation and retention cutoff dates rotationDate = (datetime.datetime.now() - datetime.timedelta(days=rotationPeriod)).date() inactivationDate = (datetime.datetime.now() - datetime.timedelta(days=oldKeyInactivationPeriod)).date() deletionDate = (datetime.datetime.now() - datetime.timedelta(days=oldKeyDeletionPeriod)).date() # Counting number of days between defined periods days_to_inactivate = oldKeyInactivationPeriod - rotationPeriod #ex: (210-180 = 30) days_to_delete = oldKeyDeletionPeriod - rotationPeriod # ex: (220-180 = 40) # Format for lines in credentials.txt akidLineFormat = 'aws_access_key_id = {}' secretLineFormat = 'aws_secret_access_key = {}' # Format for name of ASM secrets secretNameFormat = 'User_{}_AccessKey' # Format for ARN of ASM secrets secretArnFormat = 'arn:aws:secretsmanager:{region_name}:{account_id}:secret:{secret_name}' # Format for Secret Manager Resource Policy secretPolicyFormat = """{{ "Version": "2012-10-17", "Statement": [ {{ "Effect": "Allow", "Principal": {{ "AWS": "arn:aws:iam::{account_id}:user/{user_name}" }}, "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds", "secretsmanager:ListSecrets" ], "Resource": "*" }} ] }} """ iamPolicyFormat = """{{ "Version": "2012-10-17", "Statement": [ {{ "Sid": "RetrieveSecretValue", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": "{secret_arn}" }}, {{ "Sid": "ListSecret", "Effect": "Allow", "Action": "secretsmanager:ListSecrets", "Resource": "*" }} ] }} """ # IAM Client iam = boto3.client('iam') # Secrets Manager Client sm = boto3.client('secretsmanager') my_session = boto3.session.Session() my_region = my_session.region_name # Sub-Clients #sns_client = boto3.client('sns') sts_client = boto3.client('sts') def lambda_handler(event, context): users = iam.list_users() response = {} force_rotate_user_name = None if "ForceRotate" in event: force_rotate_user_name = event['ForceRotate'] for user in users['Users']: user_name = user['UserName'] user_tag = iam.list_user_tags(UserName=user_name) # Here we will define if the UserType is an User for i in user_tag.get("Tags"): if i.get("Key") == "UserType": user_type = i.get("Value") user_email = "" for j in user_tag.get("Tags"): if j.get("Key") == tagKeyName: user_email = j.get("Value") if user_type == userType and user_email != "": if user_name == force_rotate_user_name: process_user(user, user_email, response, True) else: process_user(user, user_email, response, False) # Build a response for debugging - doesn't change actual work done, just gives output for testing in Lambda console #response = build_response(results) response['RotationDate'] = rotationDate.__str__() return response def get_account_id(): """ Gets the current account ID.# :return The current account Id of the Lambda function. """ try: account_id = sts_client.get_caller_identity().get("Account") except ClientError as error: print(error) return account_id def process_user(user, user_email, response, force=False): """Rotate access keys for a user. Inactive keys will be deleted Users with no active access keys will not be processed Users with an access key older than the rotation date will have a new key created and stored in ASM, deleting the oldest key if necessary. Users with an active access key older than the inactivation date, and an active access key newer than the rotation date will have the oldest key inactivated. Users with an inactive access key older than the deletion period, and an active access key newer than the rotation date will have the oldest key deleted On a single run of this lambda, a key will only move from active to inactive or inactive to deleted. """ user_name = user['UserName'] response[user_name] = {} lak = iam.list_access_keys(UserName=user_name) parsed_arn = parse_arn(user['Arn']) account = parsed_arn['account'] num_keys = 0 # Active Keys active_keys = [] # Inactive Keys inactive_keys = [] # Oldest Key oldest_key = None # Classify all access keys for the current user for akm in lak['AccessKeyMetadata']: num_keys += 1 if oldest_key is None or oldest_key['CreateDate'] > akm['CreateDate']: oldest_key = akm if akm['Status'] == 'Active': active_keys.append(akm) else: inactive_keys.append(akm) num_active = len(active_keys) num_inactive = len(inactive_keys) if force: # Rotation of user is forced for testing if num_active == 2: # Two active keys. Delete oldest and rotate key_to_delete = oldest_key['AccessKeyId'] iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) #print("Force: delete key") send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) response[user_name]["Deleted Old Key"] = key_to_delete create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." else: #print("Force: Create key") create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 2: classification_1 = classify_date(active_keys[0]) classification_2 = classify_date(active_keys[1]) # Two Active Keys if classification_1 == "New" or classification_2 == "New": # At least one key is new. Handle oldest one according to inactivation/deletion dates handle_oldest_key(user_name, account, user_email, response, oldest_key) else: # Both keys older than rotation date. Delete oldest and create new key_to_delete = oldest_key['AccessKeyId'] #print("Oldest key deleted") iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) response[user_name]["Deleted Old Key"] = key_to_delete create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 1 and num_inactive == 1: # One active and one inactive. Handle inactive key according to inactivation/deletion dates handle_oldest_key(user_name, account, user_email, response, inactive_keys[0]) elif num_active == 1 and num_inactive == 0: # Single key that is active. Rotate if necessary. classification = classify_date(active_keys[0]) if classification == "New": response[user_name]["Action"] = "No key rotation required." else: create_access_key(user, user_email, response) response[user_name]["Action"] = "Key rotated." elif num_active == 0 and num_inactive > 0: # If no active keys, delete all inactive keys response[user_name]["Deleted Inactive Keys"] = [] for key_to_delete in inactive_keys: iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email) #print("Deleting one key") response[user_name]["Deleted Inactive Keys"].append(key_to_delete) def classify_date(akm): creation_date = akm['CreateDate'].date() if creation_date > rotationDate: return "New" if creation_date > inactivationDate: return "Rotate" if creation_date > deletionDate: return "Inactivate" return "Delete" def handle_oldest_key(user_name, account, user_email, response, oldest_key): classification = classify_date(oldest_key) if classification == "Inactivate": key_to_inactivate = oldest_key['AccessKeyId'] iam.update_access_key(UserName=user_name, AccessKeyId=key_to_inactivate, Status='Inactive') send_notification_update_key(user_name, key_to_inactivate, account, "deaktivert", user_email) #print("Key deactivated") response[user_name]["Inactivated Old Key"] = key_to_inactivate elif classification == "Delete": key_to_delete = oldest_key['AccessKeyId'] iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete) send_notification_update_key(user_name, key_to_delete, account, "slettet", user_email) #print("Key deleted") response[user_name]["Deleted Old Key"] = key_to_delete def format_secret_arn(secret_name): account_id = get_account_id() return secretArnFormat.format(account_id = account_id, secret_name = secret_name, region_name = my_region) def format_secret_policy(user): account_id = get_account_id() user_name = user['UserName'] return secretPolicyFormat.format(account_id = account_id, user_name = user_name) def format_iam_policy(secret_arn): account_id = get_account_id() return iamPolicyFormat.format(account_id = account_id, secret_arn = secret_arn) def create_resource_policy(user, secret_name, secret_arn): resource_policy_document = format_secret_policy(user) sm.put_resource_policy(SecretId=secret_name, ResourcePolicy=resource_policy_document, BlockPublicPolicy=True) def create_iam_policy_if_not_exist(user, secret_arn): user_name = user['UserName'] create_policy = False policy_name = 'SecretsAccessPolicy' try: iam.get_user_policy(UserName = user_name, PolicyName = policy_name) except botocore.exceptions.ClientError as e: # TODO - IAM uses IAM.Client.exceptions.NoSuchEntityException # Find out if it inherits from ClientError. If it does, this code is probably ok, # but may need to change ResourceNotFoundException to NoSucheEntityException if e.response['Error']['Code'] == 'NoSuchEntity': create_policy = True else: raise e # Go Bonk if create_policy: policy_document = format_iam_policy(secret_arn) iam.put_user_policy(UserName = user_name, PolicyName = policy_name, PolicyDocument = policy_document) def create_access_key(user, user_email, response): user_name = user['UserName'] secret_name = secretNameFormat.format(user_name) secret_arn = format_secret_arn(secret_name) # Create new access key new_access_key = iam.create_access_key(UserName=user_name) response[user_name]["Created Access Key"] = new_access_key['AccessKey']['AccessKeyId'] response[user_name]["ASM Secret Name"] = secret_name akid_line = akidLineFormat.format( new_access_key['AccessKey']['AccessKeyId']) secret_line = secretLineFormat.format( new_access_key['AccessKey']['SecretAccessKey']) cred_file_body = '{}\n{}'.format(akid_line, secret_line) # Create new secret, or store in existing create_secret = False try: # See if the secret we need already exists sm.describe_secret(SecretId=secret_name) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': create_secret = True else: raise e # Go Bonk if create_secret: sm.create_secret( Name=secret_name, Description='Auto-created secret', SecretString=cred_file_body) else: sm.put_secret_value(SecretId=secret_name, SecretString=cred_file_body) secret_value = sm.get_secret_value(SecretId=secret_name) parsed_arn = parse_arn(secret_value['ARN']) #print("Parsing with notifier:", parsed_arn) send_notification_create_key(user_name, parsed_arn['account'], new_access_key['AccessKey']['AccessKeyId'], secret_value['ARN'], rotationPeriod, days_to_inactivate, days_to_delete, user_email) create_resource_policy(user, secret_name, secret_arn) create_iam_policy_if_not_exist(user, secret_arn)
notifier.py
""" Notifier script Provies core logic for the Notifier Lambda Function. __author__ = "Samiul Saki Chowdhury" """ import datetime import logging import botocore import boto3 import json import os from botocore.exceptions import ClientError SES_SENDER_EMAIL_ADDRESS = str(os.environ['AdminEmail']) #'noreply-dev@bymoslo.net' now = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M")) # The character encoding for the email. CHARSET = "UTF-8" def parse_arn(arn): # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html elements = arn.split(':', 5) result = { 'arn': elements[0], 'partition': elements[1], 'service': elements[2], 'region': elements[3], 'account': elements[4], 'resource': elements[5], 'resource_type': None } if '/' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split('/', 1) # NOQA elif ':' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split(':', 1) # NOQA return result def send_notification_create_key(user_name, account_id, access_key, secret_value, rotationPeriod, days_to_inactivate, days_to_delete, user_email): ses_client = boto3.client('ses') # The HTML body of the email. BODY_HTML = """<html> <head></head> <body> <div> <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3> </div> <p>Hei {user_name},</p> <p>Din tilgangsnøkkel for konto:: <strong>{account_id}</strong> er over {rotationPeriod} dager gammel. Vi har nå opprettet en ny tilgangsnøkkel for deg.</p> <p>Vennligst bruk følgende informasjon for å hente den nye tilgangsnøkkelen fra Secrets Manager.</p> <table> <tbody> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td> <td style="width: 40%; height: 18px;">{user_name}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td> <td style="width: 40%; height: 18px;">{account_id}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td> <td style="width: 40%; height: 18px;">{access_key}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>SecretARN:</strong></td> <td style="width: 40%; height: 18px;">{secret_value}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Status:</strong></td> <td style="width: 40%; height: 18px;">OPPRETTET</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Dato Opprettet:</strong></td> <td style="width: 40%; height: 18px;">{now}</td> </tr> </tbody> </table> <p>Din gamle tilgangsnøkkel er fortsatt aktiv og kan brukes som normalt. Den vil bli deaktivert automatisk om {days_to_inactivate} dager og evt. slettet om {days_to_delete} dager. Vennligst last ned tilgangsnøkkel id (Access Key ID) og den hemmelige nøkkelen (Secret Key), og oppdater AWS-profilen din i lokal maskin/applikasjon før de blir slettet.</p> <p><em>Du kan kjøre følgende kommando med CLI for å hente din nye hemmelige nøkkel (Secret Key):</em><br/> <code># aws secretsmanager get-secret-value --secret-id {secret_value} --output table [--profile Your_Profile]</code></p> <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/> Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p> <p>Med vennlig hilsen<br/> AWS Infrastrukturteam<br> Digital Teknologi og Innsikt</p> </body> </html> """.format(user_name=user_name, account_id=account_id, access_key=access_key, secret_value=secret_value, rotationPeriod=rotationPeriod, now=now, days_to_inactivate=days_to_inactivate, days_to_delete=days_to_delete) try: ses_response = ses_client.send_email( Destination={'ToAddresses': [user_email]}, Message={ 'Body': { 'Html': { 'Charset': CHARSET, 'Data': BODY_HTML, } }, 'Subject': {'Charset': 'UTF-8', 'Data': f'Din nye AWS-tilgangsnøkkel ble opprettet for konto {account_id}'} }, Source=SES_SENDER_EMAIL_ADDRESS ) except ClientError as e: logging.error(e.response['Error']['Message']) print("Email sending unsuccessfull") else: logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}') print("Email sent successfully") def send_notification_update_key(user_name, access_key, account_id, status, user_email): ses_client = boto3.client('ses') BODY_HTML = """ <html> <head></head> <body> <div> <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3> </div> <p>Hei {user_name},</p> <p>Din tilgangsnøkkel-ID {access_key} for konto: <strong>{account_id}</strong> er nå <strong>{status}</strong>. Bruk nøkkelen som ble opperettet for deg tidligere, eller kontakt AWS infrastrukturteamet for å få hjelp.</p> <table> <tbody> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td> <td style="width: 40%; height: 18px;">{user_name}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td> <td style="width: 40%; height: 18px;">{account_id}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td> <td style="width: 40%; height: 18px;">{access_key}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Status:</strong></td> <td style="width: 40%; height: 18px;">{status}</td> </tr> <tr style="height: 18px;"> <td style="width: 2%; height: 18px;"></td> <td style="width: 10%; height: 18px;"><strong>Dato {status}:</strong></td> <td style="width: 40%; height: 18px;">{now}</td> </tr> </tbody> </table> <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/> Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p> <p>Med vennlig hilsen<br/> AWS Infrastrukturteam<br> Digital Teknologi og Innsikt</p> </body> </html> """.format(user_name=user_name, account_id=account_id, access_key=access_key, status=status.upper(), now=now) try: ses_response = ses_client.send_email( Destination={'ToAddresses': [user_email]}, Message={ 'Body': { 'Html': { 'Charset': CHARSET, 'Data': BODY_HTML, } }, 'Subject': {'Charset': CHARSET, 'Data': f'Din gamle AWS-tilgangsnøkkel er nå {status} for konto {account_id}'} }, Source=SES_SENDER_EMAIL_ADDRESS ) except ClientError as e: logging.error(e.response['Error']['Message']) print("Email sending unsuccessfull") else: logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}') print("Email sent successfully")
Note: The BODY_HTML variables for the email-templates can be anything and change theem as you prefer
- Zip these two files together and call it access_key_auto_rotation.zip
Note: This zip file will be defined on the Terraform modulde so this need to be noted down.
-
Now create a Terraform module on the same local folder:
-
main.tf
# Terraform module to create lambda function to # rotate IAM access keys for AWS users # # Author = "Samiul Saki Chowdhury" # Copyright = "Bymiløetaten, Oslo Kommune" # Version = "1.0" data "aws_iam_policy_document" "lambda_function_assume_role_policy" { statement { actions = ["sts:AssumeRole"] principals { type = "Service" identifiers = ["lambda.amazonaws.com", "events.amazonaws.com"] } } } resource "aws_iam_role" "lambda_function_execution_role" { name = "StackSet-SecurityFindings-RotationLambdaFunction" assume_role_policy = data.aws_iam_policy_document.lambda_function_assume_role_policy.json managed_policy_arns = [ "arn:${var.aws_partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" ] inline_policy { name = "AllowRotationFunctionPermissions" policy = jsonencode({ "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:GetObject" ], "Resource": [ "arn:${var.aws_partition}:s3:::${var.s3_bucket_name}/*" ], "Effect": "Allow" }, { "Action": [ "iam:List*", "iam:CreatePolicy", "iam:CreateAccessKey", "iam:DeleteAccessKey", "iam:UpdateAccessKey", "iam:PutUserPolicy", "iam:GetUserPolicy" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "iam:AttachUserPolicy" ], "Resource": [ "arn:${var.aws_partition}:iam::${var.aws_accountid}:user/*" ], "Effect": "Allow" }, { "Action": [ "secretsmanager:PutResourcePolicy", "secretsmanager:PutSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:CreateSecret", "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue" ], "Resource": [ "arn:${var.aws_partition}:secretsmanager:${var.aws_region}:${var.aws_accountid}:secret:*" ], "Effect": "Allow" }, { "Action": [ "ses:SendEmail" ], "Resource": [ "arn:${var.aws_partition}:ses:${var.aws_region}:${var.aws_accountid}:identity/*" ], "Effect": "Allow" } ] }) } tags = { Env = var.tag_function.Env Name = var.tag_function.Name } } resource "aws_lambda_function" "iam_rotation_keys" { filename = "access_key_auto_rotation.zip" function_name = "IAM-Access-Key-Rotation-LambdaFunction" role = aws_iam_role.lambda_function_execution_role.arn source_code_hash = filebase64sha256("access_key_auto_rotation.zip") runtime = "python3.8" handler = "main.lambda_handler" environment { variables = { RotationPeriod = var.environment_variables.RotationPeriod InactivePeriod = var.environment_variables.InactivePeriod RetentionPeriod = var.environment_variables.RetentionPeriod TagKey = var.environment_variables.TagKey UserType = var.environment_variables.UserType AdminEmail = var.environment_variables.AdminEmail } } tags = { Env = var.tag_function.Env Name = var.tag_function.Name } } resource "aws_cloudwatch_event_rule" "daily_invocation" { name = "StackSet-SecurityFindings-RotationCloudWatchEvent" description = "CloudWatch Event to trigger Access Key auto-rotation Lambda Function daily" schedule_expression = var.schedule is_enabled = "true" tags = { Env = var.tag_function.Env Name = var.tag_function.Name } } resource "aws_cloudwatch_event_target" "iam_rotation_daily" { rule = aws_cloudwatch_event_rule.daily_invocation.name target_id = "lambda_function_execution_role" arn = aws_lambda_function.iam_rotation_keys.arn } resource "aws_lambda_permission" "allow_cloudwatch_to_call_iam_rotation_keys" { statement_id = "AllowExecutionFromCloudWatch" action = "lambda:InvokeFunction" function_name = aws_lambda_function.iam_rotation_keys.function_name principal = "events.amazonaws.com" source_arn = aws_cloudwatch_event_rule.daily_invocation.arn }
-
provider.tf
# AWS Provider provider "aws" { region = var.aws_region shared_credentials_file = var.shared_credentials_file profile = var.aws_profile }
-
terraform.tfvars
# Connect to AWS infrastructure aws_region = "eu-west-1" aws_profile = "[AWS_Profil]" # Tags tag_function = { Env = "Sandbox" Name = "IAM-Rotate-Keys" } # Pre-configured Infrastructure # For siste versjon vi skal fjerne behov for S3 bucket s3_bucket_name = "iam-rotate-keys" S3BucketPrefix = "iam-rotation" aws_partition = "aws" aws_accountid = "[AccoutID]" # Ex: 123456789012 # Environment variables for Lambda functions environment_variables = { AdminEmail = "[Sender_E-postadresse]" # Ex: noreply@example.net InactivePeriod = "210" RetentionPeriod = "220" RotationPeriod = "180" TagKey = "Email" UserType = "User" } # Variables for EventBridge schedule = "rate(24 hours)" # Default
-
variables.tf
# Connect to AWS infrastructure variable "shared_credentials_file" { default = "$HOME/.aws/credentials" } variable "aws_region" { default = "eu-west-1" } variable "aws_profile" { default = "default" } # Tags variable "tag_function" { type = object({ Env = string Name = string }) } # Pre-configured Infrastructure variable "s3_bucket_name" { type = string default = "iam-rotate-keys" } variable "S3BucketPrefix" { type = string default = "iam-rotation" } variable "aws_partition" { default = "aws" } variable "aws_accountid" { type = string } # Environment variables for Lambda functions variable "environment_variables" { type = object({ RotationPeriod = string InactivePeriod = string RetentionPeriod = string TagKey = string UserType = string AdminEmail = string }) description = "Environment variables for IAM-rotate-keys lambda function" } # Variables for EventBridge variable "schedule" { type = string default = "rate(24 hours)" description = "Schedule to trigger lambda function IAM-rotate-keys, default 00:00 daily. See doc: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html" }
Note: Change the variables in terraform.tfvars, variables.tf as preferred
- Deploy the with Terraform v12:
terraform init terraform plan terraform apply .... .... Plan: 5 to add, 0 to change, 0 to destroy. Do you want to perform these actions? Terraform will perform the actions described above. Only 'yes' will be accepted to approve. Enter a value: yes
-
-
Automation is now rolled out using Terraform
An optional automation:
- We need to make sure that all users have these TagKey in addition (it is important that TagKey matches as written here):
Key = Email and Value = [iam-user-email] Key = UserType and Value = [User / Service]
We can automate this with a python script. This script will update all IAM users with tag key Email and UserType for using automation (IAM user Access keys Rotation). We will run them from our local PC with an AWS user profile (which has appropriate access)
- Install the following libraries for Python 3
pip3 install pandas boto3
- To run scripts on a local machine (as a one-time process):
- First create the python script
main.py
""" Automatically updates IAM user tags based on information given in list in CSV files __author__ = "Samiul Saki Chowdhury" """ import csv import os import pandas import boto3 import math import argparse # Variables userType = "UserType" tagKeyName = "Email" # csv file name filename = "accounts.csv" # Takes AWS profile as arguments parser = argparse.ArgumentParser(description='Script to update IAM user tags.') parser.add_argument('--profile', action='store', type=str, help='runs the script for the given profile') args = parser.parse_args() aws_profile = str(args.profile) # IAM Client # iam = boto3.client('iam') iam = boto3.Session(profile_name=aws_profile, region_name="eu-west-1").client("iam") def lambda_handler(): users = iam.list_users() for user in users["Users"]: parsed_arn = parse_arn(user["Arn"]) user_name = user["UserName"] user_tag = iam.list_user_tags(UserName=user_name) print("\nUser>",user_name) user_type = find_tag_value(user_tag, userType) user_email = find_tag_value(user_tag, tagKeyName) try: account_id,account_name,account_email,account_type = get_info(user["UserName"]) print("Data:",account_id, account_name, account_email, account_type) # Untagging process if type(account_type) != str: account_type_nan = math.isnan(account_type) if account_type_nan == True and user_type is not None: untag_iam_user(user_name, userType) if type(account_email) != str: account_email_nan = math.isnan(account_email) if account_email_nan == True and user_email is not None: untag_iam_user(user_name, tagKeyName) # Tagging process if type(account_type) == str and account_type != "" and user_type != account_type: tag_iam_user(user_name, userType, account_type) if type(account_email) == str and account_email != "" and user_email != account_email: tag_iam_user(user_name, tagKeyName, account_email) except: write_to_file(parsed_arn['account'], user_name, user_email, user_type) def get_info(user): df = pandas.read_csv(filename) dict_all = df.to_dict(orient='list') try: index_user = dict_all.get("accountname").index(str(user)) account_id = dict_all.get("accountid")[index_user] account_name = dict_all.get("accountname")[index_user] account_email = dict_all.get("accountemail")[index_user] account_type = dict_all.get("accounttype")[index_user] except ValueError: print("No info found in CSV") return account_id,account_name,account_email,account_type def write_to_file(accountId,accountName,accountEmail,accountType): f = open('accounts.csv','a') if accountEmail == None: accountEmail = "" if accountType == None: accountType = "" row = '\n{},{},{},{}'.format(accountId, accountName, accountEmail, accountType) f.write(row) f.close() print("Updated CSV successfully") def tag_iam_user(userName,tagKey,tagValue): iam.tag_user(UserName=str(userName),Tags=[{ 'Key': str(tagKey), 'Value': str(tagValue) }]) print("Updated with new tag key/value:", tagKey) def untag_iam_user(userName,unTagKey): iam.untag_user(UserName=str(userName),TagKeys=[ str(unTagKey) ]) print("Untagged tag key:", unTagKey) def find_tag_value(userTag, tagKey): for i in userTag.get("Tags"): if i.get("Key") == tagKey: return i.get("Value") return None def parse_arn(arn): # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html elements = arn.split(':', 5) result = { 'arn': elements[0], 'partition': elements[1], 'service': elements[2], 'region': elements[3], 'account': elements[4], 'resource': elements[5], 'resource_type': None } if '/' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split('/', 1) # NOQA elif ':' in result['resource']: result['resource_type'], result['resource'] = result['resource'].split(':', 1) # NOQA return result lambda_handler()
- We are going to create and fill in the accounts.csv file with all the available information for all users in an AWS IAM account. If you already have an overview of all user information, you can already update the csv file by opening it with notepad or vscode. The CSV file should look like (with headers on top that should not be removed / edited):
accountid,accountname,accountemail,accounttype 123456789012,GITHUB_User,,Service 123456789012,Grafana_User,,Service 123456789012,dev-user,dev.user@whatever.com,User 123456789012,test-user,test.user@whatever.com,User .... .... 123456789012,samiul,samiul.saki@example.net,User ....
- Otherwise just run the main.py script to fill in the csv file automatically. You must add AWS profile name as an argument to the script:
python3 main.py --profile <aws_profile>
- Read help text if you need help
> python3 main.py -h usage: main.py [-h] [--profile PROFILE] Script to update IAM user tags. optional arguments: -h, --help show this help message and exit --profile PROFILE runs the script for the given profile
- You now have a list of all user information. Edit the rows based on information you want to update.
Note: This copy of the csv file has first priority over whichever information already exists in AWS. This means that you have full control over user information in the list displayed in the CSV file.
- To update the CSV file (in this order, separated by commas):
- accountid = Account ID for AWS account where you want to update the user information
- accountname = Username of the user
- accountemail = The e-mail address of the IAM user (to be used for iam-access-key-rotation script to send out notification / information of access keys rotation), possibly there will be tag value for tagkey Email
- accounttype = Type of IAM user, possibly tag value for tagkey UserType.
Important: Fill in the UserType column for users with either with User / Service as you wish. Remember that users with User UserType should have an email address as well, but it is not necessarily.
- Run main.py again.
python3 main.py --profile <aws_profile>
- The script will update the user information in the IAM according to the information contained in the csv file.
- First create the python script
You’re now done.
-
To update any other IAM users in another AWS account:
- Remove all users from list (except headers) in the CSV file
- Change the aws_profile variable to the new aws profile in your local machine in the main.py script
- Run main.py. Same as before, fill in the csv file at first run, then update and run the script again to roll the update for IAM users in the AWS account.
-
To edit an IAM user information:
- To change user information, edit that CSV file and run main.py again
Example: accountid,accountname,accountemail,accounttype 123456789012,dev-user,dev.user@whatever.com,User # Here I have changed the email address with @whatever.com
- To change user information, edit that CSV file and run main.py again
-
To remove (Untag) a Tagkey for IAM users:
- Simply edit the CSV file. With same example_
Example: accountid,accountname,accountemail,accounttype 123456789012,dev-user,,User # Here I have removed the email address of the user
- Run main.py again.
- The script should remove the user’s tagkey Email, UserType in IAM since it is an Empty string.
Example: accountid,accountname,accountemail,accounttype 123456789012,dev-user,,User # Will remove Email tagkey 123456789012,dev-user,, # Will remove Email and UserType tagkey
- Simply edit the CSV file. With same example_