Automatically rotate IAM user access keys

Access keys are long-term credentials for an AWS Identity and Access Management (IAM) user or the Amazon Web Services (AWS) account root user. Regularly rotating your IAM credentials helps prevent a compromised set of IAM access keys from accessing components in your AWS account. Rotating IAM credentials is also an important part of security best practices in IAM.

Our setup will check the status of access-keys for IAM users and do the following:

  • New IAM access keys are generated when existing access keys are 180 days old.
  • The new access keys are stored as a secret in AWS Secrets Manager. A resource-based policy only allows the specified IAM principle to access and retrieve the secret.
  • The account owner of the new access keys receives a notification via e-mail.
  • The previous access keys are deactivated at 210 days old.
  • The previous access keys are then deleted after 220 days.

Note: the days defined can be changed

AWS Lambda-function and Amazon EventBridge perform these actions automatically. An user can then retrieve the new pair of access keys and replace them in their code or programs.

Technology stack:

  • Amazon DynamoDB
  • S3
  • EventBridge
  • IAM
  • Lambda
  • Secrets Manager
  • Amazon SES

IAM-Rotation-Infrastructure.md.png

Setup Process - CloudFormation:

  • In the administrator account,

    • We create a IAM-role called AWSCloudFormationStackSetAdministrationRole
      • with the following policy:
        AWSCloudFormationStackSetAdministrationPolicy
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": [
                        "sts:AssumeRole"
                    ],
                    "Resource": [
                        "arn:aws:iam::*:role/AWSCloudFormationStackSetExecutionRole"
                    ],
                    "Effect": "Allow"
                }
            ]
        }
        
      • which has:
        Trust relationship
        {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "cloudformation.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        }
        
    • And we create a service role called AWSCloudFormationStackSetExecutionRole that trusts the administrator account
      • with following policy:
        AWSCloudFormationStackSetExecutionPolicy
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "VisualEditor0",
                    "Effect": "Allow",
                    "Action": [
                        "sns:*",
                        "lambda:CreateFunction",
                        "iam:GetRole",
                        "s3:*",
                        "lambda:GetFunction",
                        "iam:CreateRole",
                        "iam:DeleteRole",
                        "cloudformation:*",
                        "dynamodb:*",
                        "iam:AttachRolePolicy",
                        "iam:PutRolePolicy",
                        "iam:DetachRolePolicy",
                        "iam:DeleteRolePolicy",
                        "lambda:DeleteFunction",
                        "iam:PassRole",
                        "lambda:AddPermission",
                        "events:PutRule",
                        "events:DescribeRule",
                        "lambda:RemovePermission",
                        "events:RemoveTargets",
                        "events:PutTargets",
                        "events:DeleteRule"
                    ],
                    "Resource": "*"
                }
            ]
        }  
        
      • which has:
        Trust relationship
        {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "AWS": "arn:aws:iam::ACCOUNT_ID:root"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        }  
        
  • Go to the SES console and check if you have a sender address verified in the account (follow instruction that found here).

    • We will move the account outside the sandbox so that it can send to and from the email addresses outside its own domain.
    • This is to confirm that the lambda function can send email to users with a different email domain address outside the controlled domain
  • Create a local folder with the following files:

    • main.py
      """
      Main script
      
      Automatically rotates IAM access keys in for 
      the AWS users
      
      __author__      = "Samiul Saki Chowdhury"
      """
      
      import botocore
      import boto3
      import datetime
      import os
      import base64
      from botocore.exceptions import ClientError
      from notifier import *
      
      # Inputs from Environment Variables
      
      # Global Variables
      #sns_arn = os.environ["sns_arn"]
      
      # Environment Variable: RotationPeriod
      # The number of days after which a key should be rotated
      rotationPeriod = int(os.environ['RotationPeriod'])
      
      # Environment Variable: InactivePeriod
      # The number of days after which to inactivate keys that had been rotated
      # Note: This must be greater than RotationPeriod
      oldKeyInactivationPeriod = int(os.environ['InactivePeriod'])
      
      # Environment Variable: RetentionPeriod
      # The number of days after which to delete keys that have been rotated and inactivated
      # Note: This must be greater than InactivePeriod
      oldKeyDeletionPeriod = int(os.environ['RetentionPeriod'])
      
      # Environment Variable: TagKey
      # The TagKey that will define user only with email addresses, evt. send the secret keys
      # Note: This must match the keys that are defined as TagKey name
      tagKeyName = str(os.environ['TagKey'])
      
      # Environment Variable: UserType
      # The UserType that will define which user (Person) need to be check, evt. rotate the keys
      # Note: This must match the keys that are defined as UserType type
      userType = str(os.environ['UserType'])
      
      # Pre-calculate the rotation and retention cutoff dates
      rotationDate = (datetime.datetime.now() -
                      datetime.timedelta(days=rotationPeriod)).date()
      inactivationDate = (datetime.datetime.now() -
                          datetime.timedelta(days=oldKeyInactivationPeriod)).date()
      deletionDate = (datetime.datetime.now() -
                      datetime.timedelta(days=oldKeyDeletionPeriod)).date()
      
      # Counting number of days between defined periods
      days_to_inactivate = oldKeyInactivationPeriod - rotationPeriod #ex: (210-180 = 30)
      days_to_delete = oldKeyDeletionPeriod - rotationPeriod # ex: (220-180 = 40)
      
      # Format for lines in credentials.txt
      akidLineFormat = 'aws_access_key_id = {}'
      secretLineFormat = 'aws_secret_access_key = {}'
      
      # Format for name of ASM secrets
      secretNameFormat = 'User_{}_AccessKey'
      
      # Format for ARN of ASM secrets
      secretArnFormat = 'arn:aws:secretsmanager:{region_name}:{account_id}:secret:{secret_name}'
      
      # Format for Secret Manager Resource Policy
      secretPolicyFormat = """{{
          "Version": "2012-10-17",
          "Statement": [
              {{
                  "Effect": "Allow",
                  "Principal": {{
                      "AWS": "arn:aws:iam::{account_id}:user/{user_name}"
                  }},
                  "Action": [
                      "secretsmanager:GetSecretValue",
                      "secretsmanager:DescribeSecret",
                      "secretsmanager:ListSecretVersionIds",
                      "secretsmanager:ListSecrets"
                  ],
                  "Resource": "*"
              }}
          ]
      }}
      """
      
      iamPolicyFormat = """{{
          "Version": "2012-10-17",
          "Statement": [
              {{
                  "Sid": "RetrieveSecretValue",
                  "Effect": "Allow",
                  "Action": [
                      "secretsmanager:GetSecretValue",
                      "secretsmanager:DescribeSecret",
                      "secretsmanager:ListSecretVersionIds"
                  ],
                  "Resource": "{secret_arn}"
              }},
              {{
                  "Sid": "ListSecret",
                  "Effect": "Allow",
                  "Action": "secretsmanager:ListSecrets",
                  "Resource": "*"
              }}
          ]
      }}
      """
      
      # IAM Client
      iam = boto3.client('iam')
      
      # Secrets Manager Client
      sm = boto3.client('secretsmanager')
      my_session = boto3.session.Session()
      my_region = my_session.region_name
      
      # Sub-Clients
      #sns_client = boto3.client('sns')
      sts_client = boto3.client('sts')
      
      
      def lambda_handler(event, context):
          users = iam.list_users()
          response = {}    
          
          force_rotate_user_name = None
          if "ForceRotate" in event:
              force_rotate_user_name = event['ForceRotate']
      
          for user in users['Users']:
              user_name = user['UserName']
              user_tag = iam.list_user_tags(UserName=user_name)
              
              # Here we will define if the UserType is an User        
              for i in user_tag.get("Tags"):
                  if i.get("Key") == "UserType":                
                      user_type = i.get("Value")
                      user_email = ""
                      for j in user_tag.get("Tags"):
                          if j.get("Key") == tagKeyName:
                              user_email = j.get("Value")
                      
                      if user_type == userType and user_email != "":
                          if user_name == force_rotate_user_name:                
                              process_user(user, user_email, response, True)
                          else:                        
                              process_user(user, user_email, response, False)
      
          # Build a response for debugging - doesn't change actual work done, just gives output for testing in Lambda console
          #response = build_response(results)
          response['RotationDate'] = rotationDate.__str__()
          return response
      
      def get_account_id():
          """
          Gets the current account ID.#
      
          :return The current account Id of the Lambda function.
          """
          try:
              account_id = sts_client.get_caller_identity().get("Account")
          except ClientError as error:
              print(error)
      
          return account_id
      
      def process_user(user, user_email, response, force=False):
          """Rotate access keys for a user.
      
          Inactive keys will be deleted
          Users with no active access keys will not be processed
          Users with an access key older than the rotation date will have a new key created and stored in ASM, deleting the oldest key if necessary.
          Users with an active access key older than the inactivation date, and an active access key newer than the rotation date will have the oldest key inactivated.
          Users with an inactive access key older than the deletion period, and an active access key newer than the rotation date will have the oldest key deleted
          On a single run of this lambda, a key will only move from active to inactive or inactive to deleted.
          """
      
          user_name = user['UserName']    
          response[user_name] = {}
          lak = iam.list_access_keys(UserName=user_name)
          parsed_arn = parse_arn(user['Arn'])
          account = parsed_arn['account']
      
          num_keys = 0
      
          # Active Keys
          active_keys = []
      
          # Inactive Keys
          inactive_keys = []
      
          # Oldest Key
          oldest_key = None
      
          # Classify all access keys for the current user
          for akm in lak['AccessKeyMetadata']:
              num_keys += 1
              if oldest_key is None or oldest_key['CreateDate'] > akm['CreateDate']:
                  oldest_key = akm
              if akm['Status'] == 'Active':
                  active_keys.append(akm)
              else:
                  inactive_keys.append(akm)
      
          num_active = len(active_keys)
          num_inactive = len(inactive_keys)
      
          if force:
              # Rotation of user is forced for testing
              if num_active == 2:
                  # Two active keys. Delete oldest and rotate
                  key_to_delete = oldest_key['AccessKeyId']
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  #print("Force: delete key")
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  response[user_name]["Deleted Old Key"] = key_to_delete
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
              else:
                  #print("Force: Create key")
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 2:
              classification_1 = classify_date(active_keys[0])
              classification_2 = classify_date(active_keys[1])
      
              # Two Active Keys
              if classification_1 == "New" or classification_2 == "New":
                  # At least one key is new. Handle oldest one according to inactivation/deletion dates
                  handle_oldest_key(user_name, account, user_email, response, oldest_key)
              else:
                  # Both keys older than rotation date. Delete oldest and create new
                  key_to_delete = oldest_key['AccessKeyId']
                  #print("Oldest key deleted")
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  response[user_name]["Deleted Old Key"] = key_to_delete
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 1 and num_inactive == 1:
              # One active and one inactive. Handle inactive key according to inactivation/deletion dates
              handle_oldest_key(user_name, account, user_email, response, inactive_keys[0])
          elif num_active == 1 and num_inactive == 0:
              # Single key that is active. Rotate if necessary.
              classification = classify_date(active_keys[0])
              if classification == "New":
                  response[user_name]["Action"] = "No key rotation required."
              else:
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 0 and num_inactive > 0:
              # If no active keys, delete all inactive keys
              response[user_name]["Deleted Inactive Keys"] = []
              for key_to_delete in inactive_keys:
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  #print("Deleting one key")
                  response[user_name]["Deleted Inactive Keys"].append(key_to_delete)
      
      def classify_date(akm):
          creation_date = akm['CreateDate'].date()
          if creation_date > rotationDate:
              return "New"
          if creation_date > inactivationDate:
              return "Rotate"
          if creation_date > deletionDate:
              return "Inactivate"
          return "Delete"
      
      def handle_oldest_key(user_name, account, user_email, response, oldest_key):
          classification = classify_date(oldest_key)
      
          if classification == "Inactivate":
              key_to_inactivate = oldest_key['AccessKeyId']
              iam.update_access_key(UserName=user_name, AccessKeyId=key_to_inactivate, Status='Inactive')
              send_notification_update_key(user_name, key_to_inactivate, account, "deaktivert", user_email)
              #print("Key deactivated")
              response[user_name]["Inactivated Old Key"] = key_to_inactivate
          elif classification == "Delete":
              key_to_delete = oldest_key['AccessKeyId']
              iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
              send_notification_update_key(user_name, key_to_delete, account, "slettet", user_email)
              #print("Key deleted")
              response[user_name]["Deleted Old Key"] = key_to_delete
      
      def format_secret_arn(secret_name):
          account_id = get_account_id()
      
          return secretArnFormat.format(account_id = account_id, secret_name = secret_name, region_name = my_region)
      
      def format_secret_policy(user):
          account_id = get_account_id()
          user_name = user['UserName']
      
          return secretPolicyFormat.format(account_id = account_id, user_name = user_name)
      
      def format_iam_policy(secret_arn):
          account_id = get_account_id()
      
          return iamPolicyFormat.format(account_id = account_id, secret_arn = secret_arn)
      
      def create_resource_policy(user, secret_name, secret_arn):
          resource_policy_document = format_secret_policy(user)
          sm.put_resource_policy(SecretId=secret_name, ResourcePolicy=resource_policy_document, BlockPublicPolicy=True)
      
      def create_iam_policy_if_not_exist(user, secret_arn):
          user_name = user['UserName']
          create_policy = False
          policy_name = 'SecretsAccessPolicy'
          try:
              iam.get_user_policy(UserName = user_name, PolicyName = policy_name)
          except botocore.exceptions.ClientError as e:
              # TODO - IAM uses IAM.Client.exceptions.NoSuchEntityException
              #        Find out if it inherits from ClientError. If it does, this code is  probably ok,
              #        but may need to change ResourceNotFoundException to NoSucheEntityException
              if e.response['Error']['Code'] == 'NoSuchEntity':
                  create_policy = True
              else:
                  raise e  # Go Bonk
      
          if create_policy:
              policy_document = format_iam_policy(secret_arn)
              iam.put_user_policy(UserName = user_name, PolicyName = policy_name, PolicyDocument = policy_document)
      
      def create_access_key(user, user_email, response):
          user_name = user['UserName']
          secret_name = secretNameFormat.format(user_name)
          secret_arn = format_secret_arn(secret_name)
      
          # Create new access key
          new_access_key = iam.create_access_key(UserName=user_name)
          response[user_name]["Created Access Key"] = new_access_key['AccessKey']['AccessKeyId']
          response[user_name]["ASM Secret Name"] = secret_name
      
          akid_line = akidLineFormat.format(
              new_access_key['AccessKey']['AccessKeyId'])
          secret_line = secretLineFormat.format(
              new_access_key['AccessKey']['SecretAccessKey'])
          cred_file_body = '{}\n{}'.format(akid_line, secret_line)
      
          # Create new secret, or store in existing
          create_secret = False
          try:
              # See if the secret we need already exists
              sm.describe_secret(SecretId=secret_name)
          except botocore.exceptions.ClientError as e:
              if e.response['Error']['Code'] == 'ResourceNotFoundException':
                  create_secret = True
              else:
                  raise e  # Go Bonk
      
          if create_secret:
              sm.create_secret(
                  Name=secret_name, Description='Auto-created secret', SecretString=cred_file_body)
          else:
              sm.put_secret_value(SecretId=secret_name, SecretString=cred_file_body)
          
          secret_value = sm.get_secret_value(SecretId=secret_name)
          parsed_arn = parse_arn(secret_value['ARN'])
          #print("Parsing with notifier:", parsed_arn)
          send_notification_create_key(user_name, parsed_arn['account'], new_access_key['AccessKey']['AccessKeyId'], secret_value['ARN'], rotationPeriod, days_to_inactivate, days_to_delete, user_email)
          
          
          
          create_resource_policy(user, secret_name, secret_arn)
          create_iam_policy_if_not_exist(user, secret_arn)
      
      
      
    • notifier.py
      """
      Notifier script
      
      Provies core logic for the Notifier Lambda Function.
      
      __author__      = "Samiul Saki Chowdhury"
      """
      
      import datetime
      import logging
      import botocore
      import boto3
      import json
      import os
      
      from botocore.exceptions import ClientError
      
      SES_SENDER_EMAIL_ADDRESS = str(os.environ['AdminEmail']) #'noreply-dev@bymoslo.net'
      
      now = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
      
      # The character encoding for the email.
      CHARSET = "UTF-8"
      
      def parse_arn(arn):
          # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
          elements = arn.split(':', 5)
          result = {
              'arn': elements[0],
              'partition': elements[1],
              'service': elements[2],
              'region': elements[3],
              'account': elements[4],
              'resource': elements[5],
              'resource_type': None
          }
          if '/' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split('/', 1)  # NOQA
          elif ':' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split(':', 1)  # NOQA
          return result
      
      def send_notification_create_key(user_name, account_id, access_key, secret_value, rotationPeriod, days_to_inactivate, days_to_delete, user_email):
          ses_client = boto3.client('ses')
          # The HTML body of the email.
          BODY_HTML = """<html>
          <head></head>
          <body>
          <div>
          <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3>
          </div>
          <p>Hei {user_name},</p>
      
          <p>Din tilgangsnøkkel for konto:: <strong>{account_id}</strong> er over {rotationPeriod} dager gammel. Vi har nå opprettet en ny tilgangsnøkkel for deg.</p>
      
          <p>Vennligst bruk følgende informasjon for å hente den nye tilgangsnøkkelen fra Secrets Manager.</p>
      
          <table>
              <tbody>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td>
              <td style="width: 40%; height: 18px;">{user_name}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td>
              <td style="width: 40%; height: 18px;">{account_id}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td>
              <td style="width: 40%; height: 18px;">{access_key}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>SecretARN:</strong></td>
              <td style="width: 40%; height: 18px;">{secret_value}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Status:</strong></td>
              <td style="width: 40%; height: 18px;">OPPRETTET</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Dato Opprettet:</strong></td>
              <td style="width: 40%; height: 18px;">{now}</td>
              </tr>
              </tbody>
          </table>
      
          <p>Din gamle tilgangsnøkkel er fortsatt aktiv og kan brukes som normalt. Den vil bli deaktivert automatisk om {days_to_inactivate} dager og evt. slettet om {days_to_delete} dager. Vennligst last ned tilgangsnøkkel id (Access Key ID) og den hemmelige nøkkelen (Secret Key), og oppdater AWS-profilen din i lokal maskin/applikasjon før de blir slettet.</p>
      
          <p><em>Du kan kjøre følgende kommando med CLI for å hente din nye hemmelige nøkkel (Secret Key):</em><br/>
          <code># aws secretsmanager get-secret-value --secret-id {secret_value} --output table [--profile Your_Profile]</code></p>
      
          
          <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/>
          Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p>
      
          <p>Med vennlig hilsen<br/>
          AWS Infrastrukturteam<br>
          Digital Teknologi og Innsikt</p>
          </body>
          </html>
          """.format(user_name=user_name, account_id=account_id, access_key=access_key, secret_value=secret_value, rotationPeriod=rotationPeriod, now=now, days_to_inactivate=days_to_inactivate, days_to_delete=days_to_delete)
      
          try:
              ses_response = ses_client.send_email(
                  Destination={'ToAddresses': [user_email]},
                  Message={
                      'Body': {
                          'Html': {
                              'Charset': CHARSET,
                              'Data': BODY_HTML,
                          }
                      },
                      'Subject': {'Charset': 'UTF-8', 'Data': f'Din nye AWS-tilgangsnøkkel ble opprettet for konto {account_id}'}
                  },
                  Source=SES_SENDER_EMAIL_ADDRESS
              )
          except ClientError as e:
              logging.error(e.response['Error']['Message'])
              print("Email sending unsuccessfull")
          else:
              logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}')
              print("Email sent successfully")
      
      def send_notification_update_key(user_name, access_key, account_id, status, user_email):
          ses_client = boto3.client('ses')
          BODY_HTML = """
          <html>
          <head></head>
          <body>
          <div>
          <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3>
          </div>
          <p>Hei {user_name},</p>
      
          <p>Din tilgangsnøkkel-ID {access_key} for konto: <strong>{account_id}</strong> er nå <strong>{status}</strong>. Bruk nøkkelen som ble opperettet for deg tidligere, eller kontakt AWS infrastrukturteamet for å få hjelp.</p>    
      
          <table>
              <tbody>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td>
              <td style="width: 40%; height: 18px;">{user_name}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td>
              <td style="width: 40%; height: 18px;">{account_id}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td>
              <td style="width: 40%; height: 18px;">{access_key}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Status:</strong></td>
              <td style="width: 40%; height: 18px;">{status}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Dato {status}:</strong></td>
              <td style="width: 40%; height: 18px;">{now}</td>
              </tr>
              </tbody>
          </table>
      
          <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/>
          Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p>
      
          <p>Med vennlig hilsen<br/>
          AWS Infrastrukturteam<br>
          Digital Teknologi og Innsikt</p>
          </body>
          </html>
          """.format(user_name=user_name, account_id=account_id, access_key=access_key, status=status.upper(), now=now)
      
          try:
              ses_response = ses_client.send_email(
                  Destination={'ToAddresses': [user_email]},
                  Message={
                      'Body': {
                          'Html': {
                              'Charset': CHARSET,
                              'Data': BODY_HTML,
                          }
                      },
                      'Subject': {'Charset': CHARSET, 'Data': f'Din gamle AWS-tilgangsnøkkel er nå {status} for konto {account_id}'}
                  },
                  Source=SES_SENDER_EMAIL_ADDRESS
              )
          except ClientError as e:
              logging.error(e.response['Error']['Message'])
              print("Email sending unsuccessfull")
          else:
              logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}')
              print("Email sent successfully")
      
      
      

    Note: The BODY_HTML variables for the email-templates can be anything and change theem as you prefer

    • Zip these two files together and call it access_key_auto_rotation.zip

    Note: This zip file will be defined on the CloudFormation script so this need to be noted down.

  • Now create a CloudFomation script on the same local folder:

    • iam-key-auto-rotation-and-notifier.yaml
      # CloudFormation script to create lambda function to 
      # rotate IAM access keys for AWS users
      #
      # Copyright   =  (c) 2020 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content
      #                 is provided subject to the terms of the AWS Customer Agreement available at
      #                 https://aws.amazon.com/agreement/ or other written agreement between Customer
      #                 and Amazon Web Services, Inc.
      # Version      = "1.0"
      
      AWSTemplateFormatVersion: '2010-09-09'
      Description: "AWS CloudFormation template to set up Auto-rotation function for AWS IAM Access Keys."
      
      Metadata:
        AWS::CloudFormation::Interface:
          ParameterGroups:
            - Label:
                default: Configure the Deployment
              Parameters:
                - S3BucketName
                - S3BucketPrefix
      
            - Label:
                default: Configure Notifier Tool
              Parameters:
                - AdminEmailAddress
                - RotationPeriodDays
                - InactivePeriodDays
                - RetentionPeriodDays
                
      
          ParameterLabels:
            # Deployment Configuration
            S3BucketName:
              default: CloudFormation S3 Bucket Name
            S3BucketPrefix:
              default: CloudFormation S3 Bucket Prefix
            
            # Rotation Period Settings
            RotationPeriodDays:
              default: Lambda Function Rotation Period
            InactivePeriodDays:
              default: Lambda Function Inactivation Period
            RetentionPeriodDays:
              default: Lambda Function Retention Period
      
            # Notifier Settings
            AdminEmailAddress:
              default: Admin Email Address
      
      Parameters:
        S3BucketName:
          Description: S3 Bucket Name where code is located.
          Type: String
          Default: "iam-rotate-keys"
      
        S3BucketPrefix:
          Description: The prefix or directory where resources will be stored.
          Type: String
          Default: "iam-rotation"
      
        AdminEmailAddress:
          Description: Email address that will be used in the "sent from" section of the email
          Type: String
      
        RotationPeriodDays:
          Description: Rotation period that will be used to check if the old keys are older than certain days
          Type: String
          Default: "180"
      
        InactivePeriodDays:
          Description: Inactivation period that will be used to keep the old keys inactive for certain days
          Type: String
          Default: "210"
      
        RetentionPeriodDays:
          Description: Retention period that will be used to delete the old inactive keys after certain days
          Type: String
          Default: "220"
      
      
      
      
      Resources:
        # IAM KEY ROTATION RESOURCES
        RotationLambdaFunctionExecutionRole:
          Type: AWS::IAM::Role
          Properties:
            AssumeRolePolicyDocument:
              Version: "2012-10-17"
              Statement:
                - Sid: AllowExecutionPermissionsOnFunction
                  Effect: Allow
                  Principal:
                    Service:
                      - lambda.amazonaws.com
                      - events.amazonaws.com
                  Action: sts:AssumeRole
            ManagedPolicyArns:
              - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
            Policies:
              - PolicyName: AllowRotationFunctionPermissions
                PolicyDocument:
                  Version: "2012-10-17"
                  Statement:
                    - Effect: Allow
                      Action:
                        - s3:GetObject
                      Resource:
                        - !Sub "arn:${AWS::Partition}:s3:::${S3BucketName}/*"
                    - Effect: Allow
                      Action:
                        - iam:List*
                        - iam:CreatePolicy
                        - iam:CreateAccessKey
                        - iam:DeleteAccessKey
                        - iam:UpdateAccessKey
                        - iam:PutUserPolicy
                        - iam:GetUserPolicy
                      Resource: "*"
                    - Effect: Allow
                      Action:
                        - iam:AttachUserPolicy
                      Resource:
                        - !Sub "arn:${AWS::Partition}:iam::${AWS::AccountId}:user/*"
                    - Effect: Allow
                      Action:
                        - secretsmanager:PutResourcePolicy
                        - secretsmanager:PutSecretValue
                        - secretsmanager:DescribeSecret
                        - secretsmanager:CreateSecret
                        - secretsmanager:GetResourcePolicy
                        - secretsmanager:GetSecretValue
                      Resource:
                        - !Sub "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:*"
                    - Effect: Allow
                      Action:
                        - ses:SendEmail
                      Resource:
                        - !Sub "arn:${AWS::Partition}:ses:${AWS::Region}:${AWS::AccountId}:identity/*"
        
        RotationAccessKeyRotateLambdaFunction:
          Type: AWS::Lambda::Function
          Properties:
            Description: Rotates IAM Access Keys on specified schedule
            FunctionName: IAM-Access-Key-Rotation-LambdaFunction
            Handler: main.lambda_handler
            Runtime: python3.8
            Role: !GetAtt RotationLambdaFunctionExecutionRole.Arn
            Timeout: 240
            Environment:
              Variables:
                RotationPeriod: !Ref RotationPeriodDays
                InactivePeriod: !Ref InactivePeriodDays
                RetentionPeriod: !Ref RetentionPeriodDays
                TagKey: Email
                UserType: User
                AdminEmail: !Ref AdminEmailAddress
            Code:
              S3Bucket: !Ref S3BucketName
              S3Key: !Sub ${S3BucketPrefix}/access_key_auto_rotation.zip
      
        RotationCloudWatchEventLambdaTrigger:
          Type: AWS::Events::Rule
          DependsOn:
            - RotationLambdaFunctionExecutionRole
          Properties:
            Description: CloudWatch Event to trigger Access Key auto-rotation Lambda Function daily
            ScheduleExpression: rate(24 hours)
            State: ENABLED
            Targets:
              - Arn: !GetAtt RotationAccessKeyRotateLambdaFunction.Arn
                Id: AccessKeyRotationFunction
      
      
      
  • We need to upload the files in an S3 bucket:

  • Go to the S3 console

    • Create bucket
    • Bucket name (your choice)
    • AWS region (EU Ireland: eu-west-1)
    • Select the default for the rest of the options
    • Create bucket
  • Open the bucket and upload the entire local folder (the zip file access_key_auto_rotation.zip no need to upload individual python script)

    • Feel free to use cli or console
  • Create a folder with names in your S3 bucket named iam-rotation:

IMPORTANT that you use this name for the folder. It will be your prefix for the CloudFormation script.

  • Move all the files to the iam-rotation folder.

  • Roll out the entire stack with CloudFormation:

    • Change variables in the CloudFomation yaml as you prefer.
    • Go to the CloudFormation console
      • Select StackSets (make sure you are in the same region as the S3 bucket you created)
      • Create StackSets
      • Specify template
      • Upload a template file
      • Select the iam-key-auto-rotation-and-notifier.yaml file from your local PC (or you can upload it via the S3 url to the file in your S3 bucket)
    • Next
      • StackSet name: SecurityFindingsRemediationStackSet (or your choose)
      • CloudFormation S3 Bucket Name: Your bucket name
      • CloudFormation S3 Bucket Prefix: iam-rotation (make sure it is the same as the name (prefix) of the folder)
      • Admin Email Address: It is important that there is a verified email address (example: noreply-dev@bymoslo.net)
    • Next
      • Select IAM role name from dropdown list
      • Select the administration role you created earlier: AWSCloudFormationStackSetAdministrationRole
      • Select the IAM execution role you created early: AWSCloudFormationStackSetExecutionRole
      • Tags - Optional
    • Next
      • Deploy locations: Best to select Deploy stacks in accounts and fill in Account numbers (separated by commas for several offices)
      • Specify regions: Select where the S3 bucket was created
      • Deployment option: Let them be the default
    • Next
      • Approved CF stackset creation
    • Submit
  • Automation is now rolled out using CloudFormation

After stack was created:

  • Go to stacks and select the new stack set:
    • You can follow the events on the Events tab
    • Go to the Resources tab when installation is complete. You will find all Physical IDs for the stackset with more information.

Setup Process - Terraform:

  • Same as before create a local folder with the following files:

    • main.py
      """
      Main script
      
      Automatically rotates IAM access keys in for 
      the AWS users
      
      __author__      = "Samiul Saki Chowdhury"
      """
      
      import botocore
      import boto3
      import datetime
      import os
      import base64
      from botocore.exceptions import ClientError
      from notifier import *
      
      # Inputs from Environment Variables
      
      # Global Variables
      #sns_arn = os.environ["sns_arn"]
      
      # Environment Variable: RotationPeriod
      # The number of days after which a key should be rotated
      rotationPeriod = int(os.environ['RotationPeriod'])
      
      # Environment Variable: InactivePeriod
      # The number of days after which to inactivate keys that had been rotated
      # Note: This must be greater than RotationPeriod
      oldKeyInactivationPeriod = int(os.environ['InactivePeriod'])
      
      # Environment Variable: RetentionPeriod
      # The number of days after which to delete keys that have been rotated and inactivated
      # Note: This must be greater than InactivePeriod
      oldKeyDeletionPeriod = int(os.environ['RetentionPeriod'])
      
      # Environment Variable: TagKey
      # The TagKey that will define user only with email addresses, evt. send the secret keys
      # Note: This must match the keys that are defined as TagKey name
      tagKeyName = str(os.environ['TagKey'])
      
      # Environment Variable: UserType
      # The UserType that will define which user (Person) need to be check, evt. rotate the keys
      # Note: This must match the keys that are defined as UserType type
      userType = str(os.environ['UserType'])
      
      # Pre-calculate the rotation and retention cutoff dates
      rotationDate = (datetime.datetime.now() -
                      datetime.timedelta(days=rotationPeriod)).date()
      inactivationDate = (datetime.datetime.now() -
                          datetime.timedelta(days=oldKeyInactivationPeriod)).date()
      deletionDate = (datetime.datetime.now() -
                      datetime.timedelta(days=oldKeyDeletionPeriod)).date()
      
      # Counting number of days between defined periods
      days_to_inactivate = oldKeyInactivationPeriod - rotationPeriod #ex: (210-180 = 30)
      days_to_delete = oldKeyDeletionPeriod - rotationPeriod # ex: (220-180 = 40)
      
      # Format for lines in credentials.txt
      akidLineFormat = 'aws_access_key_id = {}'
      secretLineFormat = 'aws_secret_access_key = {}'
      
      # Format for name of ASM secrets
      secretNameFormat = 'User_{}_AccessKey'
      
      # Format for ARN of ASM secrets
      secretArnFormat = 'arn:aws:secretsmanager:{region_name}:{account_id}:secret:{secret_name}'
      
      # Format for Secret Manager Resource Policy
      secretPolicyFormat = """{{
          "Version": "2012-10-17",
          "Statement": [
              {{
                  "Effect": "Allow",
                  "Principal": {{
                      "AWS": "arn:aws:iam::{account_id}:user/{user_name}"
                  }},
                  "Action": [
                      "secretsmanager:GetSecretValue",
                      "secretsmanager:DescribeSecret",
                      "secretsmanager:ListSecretVersionIds",
                      "secretsmanager:ListSecrets"
                  ],
                  "Resource": "*"
              }}
          ]
      }}
      """
      
      iamPolicyFormat = """{{
          "Version": "2012-10-17",
          "Statement": [
              {{
                  "Sid": "RetrieveSecretValue",
                  "Effect": "Allow",
                  "Action": [
                      "secretsmanager:GetSecretValue",
                      "secretsmanager:DescribeSecret",
                      "secretsmanager:ListSecretVersionIds"
                  ],
                  "Resource": "{secret_arn}"
              }},
              {{
                  "Sid": "ListSecret",
                  "Effect": "Allow",
                  "Action": "secretsmanager:ListSecrets",
                  "Resource": "*"
              }}
          ]
      }}
      """
      
      # IAM Client
      iam = boto3.client('iam')
      
      # Secrets Manager Client
      sm = boto3.client('secretsmanager')
      my_session = boto3.session.Session()
      my_region = my_session.region_name
      
      # Sub-Clients
      #sns_client = boto3.client('sns')
      sts_client = boto3.client('sts')
      
      
      def lambda_handler(event, context):
          users = iam.list_users()
          response = {}    
          
          force_rotate_user_name = None
          if "ForceRotate" in event:
              force_rotate_user_name = event['ForceRotate']
      
          for user in users['Users']:
              user_name = user['UserName']
              user_tag = iam.list_user_tags(UserName=user_name)
              
              # Here we will define if the UserType is an User        
              for i in user_tag.get("Tags"):
                  if i.get("Key") == "UserType":                
                      user_type = i.get("Value")
                      user_email = ""
                      for j in user_tag.get("Tags"):
                          if j.get("Key") == tagKeyName:
                              user_email = j.get("Value")
                      
                      if user_type == userType and user_email != "":
                          if user_name == force_rotate_user_name:                
                              process_user(user, user_email, response, True)
                          else:                        
                              process_user(user, user_email, response, False)
      
          # Build a response for debugging - doesn't change actual work done, just gives output for testing in Lambda console
          #response = build_response(results)
          response['RotationDate'] = rotationDate.__str__()
          return response
      
      def get_account_id():
          """
          Gets the current account ID.#
      
          :return The current account Id of the Lambda function.
          """
          try:
              account_id = sts_client.get_caller_identity().get("Account")
          except ClientError as error:
              print(error)
      
          return account_id
      
      def process_user(user, user_email, response, force=False):
          """Rotate access keys for a user.
      
          Inactive keys will be deleted
          Users with no active access keys will not be processed
          Users with an access key older than the rotation date will have a new key created and stored in ASM, deleting the oldest key if necessary.
          Users with an active access key older than the inactivation date, and an active access key newer than the rotation date will have the oldest key inactivated.
          Users with an inactive access key older than the deletion period, and an active access key newer than the rotation date will have the oldest key deleted
          On a single run of this lambda, a key will only move from active to inactive or inactive to deleted.
          """
      
          user_name = user['UserName']    
          response[user_name] = {}
          lak = iam.list_access_keys(UserName=user_name)
          parsed_arn = parse_arn(user['Arn'])
          account = parsed_arn['account']
      
          num_keys = 0
      
          # Active Keys
          active_keys = []
      
          # Inactive Keys
          inactive_keys = []
      
          # Oldest Key
          oldest_key = None
      
          # Classify all access keys for the current user
          for akm in lak['AccessKeyMetadata']:
              num_keys += 1
              if oldest_key is None or oldest_key['CreateDate'] > akm['CreateDate']:
                  oldest_key = akm
              if akm['Status'] == 'Active':
                  active_keys.append(akm)
              else:
                  inactive_keys.append(akm)
      
          num_active = len(active_keys)
          num_inactive = len(inactive_keys)
      
          if force:
              # Rotation of user is forced for testing
              if num_active == 2:
                  # Two active keys. Delete oldest and rotate
                  key_to_delete = oldest_key['AccessKeyId']
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  #print("Force: delete key")
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  response[user_name]["Deleted Old Key"] = key_to_delete
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
              else:
                  #print("Force: Create key")
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 2:
              classification_1 = classify_date(active_keys[0])
              classification_2 = classify_date(active_keys[1])
      
              # Two Active Keys
              if classification_1 == "New" or classification_2 == "New":
                  # At least one key is new. Handle oldest one according to inactivation/deletion dates
                  handle_oldest_key(user_name, account, user_email, response, oldest_key)
              else:
                  # Both keys older than rotation date. Delete oldest and create new
                  key_to_delete = oldest_key['AccessKeyId']
                  #print("Oldest key deleted")
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  response[user_name]["Deleted Old Key"] = key_to_delete
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 1 and num_inactive == 1:
              # One active and one inactive. Handle inactive key according to inactivation/deletion dates
              handle_oldest_key(user_name, account, user_email, response, inactive_keys[0])
          elif num_active == 1 and num_inactive == 0:
              # Single key that is active. Rotate if necessary.
              classification = classify_date(active_keys[0])
              if classification == "New":
                  response[user_name]["Action"] = "No key rotation required."
              else:
                  create_access_key(user, user_email, response)
                  response[user_name]["Action"] = "Key rotated."
          elif num_active == 0 and num_inactive > 0:
              # If no active keys, delete all inactive keys
              response[user_name]["Deleted Inactive Keys"] = []
              for key_to_delete in inactive_keys:
                  iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
                  send_notification_update_key(user_name, key_to_delete, account, "deleted", user_email)
                  #print("Deleting one key")
                  response[user_name]["Deleted Inactive Keys"].append(key_to_delete)
      
      def classify_date(akm):
          creation_date = akm['CreateDate'].date()
          if creation_date > rotationDate:
              return "New"
          if creation_date > inactivationDate:
              return "Rotate"
          if creation_date > deletionDate:
              return "Inactivate"
          return "Delete"
      
      def handle_oldest_key(user_name, account, user_email, response, oldest_key):
          classification = classify_date(oldest_key)
      
          if classification == "Inactivate":
              key_to_inactivate = oldest_key['AccessKeyId']
              iam.update_access_key(UserName=user_name, AccessKeyId=key_to_inactivate, Status='Inactive')
              send_notification_update_key(user_name, key_to_inactivate, account, "deaktivert", user_email)
              #print("Key deactivated")
              response[user_name]["Inactivated Old Key"] = key_to_inactivate
          elif classification == "Delete":
              key_to_delete = oldest_key['AccessKeyId']
              iam.delete_access_key(UserName=user_name, AccessKeyId=key_to_delete)
              send_notification_update_key(user_name, key_to_delete, account, "slettet", user_email)
              #print("Key deleted")
              response[user_name]["Deleted Old Key"] = key_to_delete
      
      def format_secret_arn(secret_name):
          account_id = get_account_id()
      
          return secretArnFormat.format(account_id = account_id, secret_name = secret_name, region_name = my_region)
      
      def format_secret_policy(user):
          account_id = get_account_id()
          user_name = user['UserName']
      
          return secretPolicyFormat.format(account_id = account_id, user_name = user_name)
      
      def format_iam_policy(secret_arn):
          account_id = get_account_id()
      
          return iamPolicyFormat.format(account_id = account_id, secret_arn = secret_arn)
      
      def create_resource_policy(user, secret_name, secret_arn):
          resource_policy_document = format_secret_policy(user)
          sm.put_resource_policy(SecretId=secret_name, ResourcePolicy=resource_policy_document, BlockPublicPolicy=True)
      
      def create_iam_policy_if_not_exist(user, secret_arn):
          user_name = user['UserName']
          create_policy = False
          policy_name = 'SecretsAccessPolicy'
          try:
              iam.get_user_policy(UserName = user_name, PolicyName = policy_name)
          except botocore.exceptions.ClientError as e:
              # TODO - IAM uses IAM.Client.exceptions.NoSuchEntityException
              #        Find out if it inherits from ClientError. If it does, this code is  probably ok,
              #        but may need to change ResourceNotFoundException to NoSucheEntityException
              if e.response['Error']['Code'] == 'NoSuchEntity':
                  create_policy = True
              else:
                  raise e  # Go Bonk
      
          if create_policy:
              policy_document = format_iam_policy(secret_arn)
              iam.put_user_policy(UserName = user_name, PolicyName = policy_name, PolicyDocument = policy_document)
      
      def create_access_key(user, user_email, response):
          user_name = user['UserName']
          secret_name = secretNameFormat.format(user_name)
          secret_arn = format_secret_arn(secret_name)
      
          # Create new access key
          new_access_key = iam.create_access_key(UserName=user_name)
          response[user_name]["Created Access Key"] = new_access_key['AccessKey']['AccessKeyId']
          response[user_name]["ASM Secret Name"] = secret_name
      
          akid_line = akidLineFormat.format(
              new_access_key['AccessKey']['AccessKeyId'])
          secret_line = secretLineFormat.format(
              new_access_key['AccessKey']['SecretAccessKey'])
          cred_file_body = '{}\n{}'.format(akid_line, secret_line)
      
          # Create new secret, or store in existing
          create_secret = False
          try:
              # See if the secret we need already exists
              sm.describe_secret(SecretId=secret_name)
          except botocore.exceptions.ClientError as e:
              if e.response['Error']['Code'] == 'ResourceNotFoundException':
                  create_secret = True
              else:
                  raise e  # Go Bonk
      
          if create_secret:
              sm.create_secret(
                  Name=secret_name, Description='Auto-created secret', SecretString=cred_file_body)
          else:
              sm.put_secret_value(SecretId=secret_name, SecretString=cred_file_body)
          
          secret_value = sm.get_secret_value(SecretId=secret_name)
          parsed_arn = parse_arn(secret_value['ARN'])
          #print("Parsing with notifier:", parsed_arn)
          send_notification_create_key(user_name, parsed_arn['account'], new_access_key['AccessKey']['AccessKeyId'], secret_value['ARN'], rotationPeriod, days_to_inactivate, days_to_delete, user_email)
          
          
          
          create_resource_policy(user, secret_name, secret_arn)
          create_iam_policy_if_not_exist(user, secret_arn)
      
      
      
    • notifier.py
      """
      Notifier script
      
      Provies core logic for the Notifier Lambda Function.
      
      __author__      = "Samiul Saki Chowdhury"
      """
      
      import datetime
      import logging
      import botocore
      import boto3
      import json
      import os
      
      from botocore.exceptions import ClientError
      
      SES_SENDER_EMAIL_ADDRESS = str(os.environ['AdminEmail']) #'noreply-dev@bymoslo.net'
      
      now = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
      
      # The character encoding for the email.
      CHARSET = "UTF-8"
      
      def parse_arn(arn):
          # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
          elements = arn.split(':', 5)
          result = {
              'arn': elements[0],
              'partition': elements[1],
              'service': elements[2],
              'region': elements[3],
              'account': elements[4],
              'resource': elements[5],
              'resource_type': None
          }
          if '/' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split('/', 1)  # NOQA
          elif ':' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split(':', 1)  # NOQA
          return result
      
      def send_notification_create_key(user_name, account_id, access_key, secret_value, rotationPeriod, days_to_inactivate, days_to_delete, user_email):
          ses_client = boto3.client('ses')
          # The HTML body of the email.
          BODY_HTML = """<html>
          <head></head>
          <body>
          <div>
          <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3>
          </div>
          <p>Hei {user_name},</p>
      
          <p>Din tilgangsnøkkel for konto:: <strong>{account_id}</strong> er over {rotationPeriod} dager gammel. Vi har nå opprettet en ny tilgangsnøkkel for deg.</p>
      
          <p>Vennligst bruk følgende informasjon for å hente den nye tilgangsnøkkelen fra Secrets Manager.</p>
      
          <table>
              <tbody>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td>
              <td style="width: 40%; height: 18px;">{user_name}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td>
              <td style="width: 40%; height: 18px;">{account_id}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td>
              <td style="width: 40%; height: 18px;">{access_key}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>SecretARN:</strong></td>
              <td style="width: 40%; height: 18px;">{secret_value}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Status:</strong></td>
              <td style="width: 40%; height: 18px;">OPPRETTET</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Dato Opprettet:</strong></td>
              <td style="width: 40%; height: 18px;">{now}</td>
              </tr>
              </tbody>
          </table>
      
          <p>Din gamle tilgangsnøkkel er fortsatt aktiv og kan brukes som normalt. Den vil bli deaktivert automatisk om {days_to_inactivate} dager og evt. slettet om {days_to_delete} dager. Vennligst last ned tilgangsnøkkel id (Access Key ID) og den hemmelige nøkkelen (Secret Key), og oppdater AWS-profilen din i lokal maskin/applikasjon før de blir slettet.</p>
      
          <p><em>Du kan kjøre følgende kommando med CLI for å hente din nye hemmelige nøkkel (Secret Key):</em><br/>
          <code># aws secretsmanager get-secret-value --secret-id {secret_value} --output table [--profile Your_Profile]</code></p>
      
          
          <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/>
          Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p>
      
          <p>Med vennlig hilsen<br/>
          AWS Infrastrukturteam<br>
          Digital Teknologi og Innsikt</p>
          </body>
          </html>
          """.format(user_name=user_name, account_id=account_id, access_key=access_key, secret_value=secret_value, rotationPeriod=rotationPeriod, now=now, days_to_inactivate=days_to_inactivate, days_to_delete=days_to_delete)
      
          try:
              ses_response = ses_client.send_email(
                  Destination={'ToAddresses': [user_email]},
                  Message={
                      'Body': {
                          'Html': {
                              'Charset': CHARSET,
                              'Data': BODY_HTML,
                          }
                      },
                      'Subject': {'Charset': 'UTF-8', 'Data': f'Din nye AWS-tilgangsnøkkel ble opprettet for konto {account_id}'}
                  },
                  Source=SES_SENDER_EMAIL_ADDRESS
              )
          except ClientError as e:
              logging.error(e.response['Error']['Message'])
              print("Email sending unsuccessfull")
          else:
              logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}')
              print("Email sent successfully")
      
      def send_notification_update_key(user_name, access_key, account_id, status, user_email):
          ses_client = boto3.client('ses')
          BODY_HTML = """
          <html>
          <head></head>
          <body>
          <div>
          <h3>Denne varslingen ble automatisk generert av AWS Lambda Funksjon.</h3>
          </div>
          <p>Hei {user_name},</p>
      
          <p>Din tilgangsnøkkel-ID {access_key} for konto: <strong>{account_id}</strong> er nå <strong>{status}</strong>. Bruk nøkkelen som ble opperettet for deg tidligere, eller kontakt AWS infrastrukturteamet for å få hjelp.</p>    
      
          <table>
              <tbody>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Brukernavn:</strong></td>
              <td style="width: 40%; height: 18px;">{user_name}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Konto ID:</strong></td>
              <td style="width: 40%; height: 18px;">{account_id}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>AccessKey ID:</strong></td>
              <td style="width: 40%; height: 18px;">{access_key}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Status:</strong></td>
              <td style="width: 40%; height: 18px;">{status}</td>
              </tr>
              <tr style="height: 18px;">
              <td style="width: 2%; height: 18px;"></td>
              <td style="width: 10%; height: 18px;"><strong>Dato {status}:</strong></td>
              <td style="width: 40%; height: 18px;">{now}</td>
              </tr>
              </tbody>
          </table>
      
          <p>For å finne ut mer om hvordan man roterer AWS-tilgangsnøkkelen regelmessig, kan du lese den offisielle guiden på https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey<br/>
          Ved noen andre spørsmål, vennligst kontakt AWS infrastrukturteam via deres respektive Slack kanaler.</p>
      
          <p>Med vennlig hilsen<br/>
          AWS Infrastrukturteam<br>
          Digital Teknologi og Innsikt</p>
          </body>
          </html>
          """.format(user_name=user_name, account_id=account_id, access_key=access_key, status=status.upper(), now=now)
      
          try:
              ses_response = ses_client.send_email(
                  Destination={'ToAddresses': [user_email]},
                  Message={
                      'Body': {
                          'Html': {
                              'Charset': CHARSET,
                              'Data': BODY_HTML,
                          }
                      },
                      'Subject': {'Charset': CHARSET, 'Data': f'Din gamle AWS-tilgangsnøkkel er nå {status} for konto {account_id}'}
                  },
                  Source=SES_SENDER_EMAIL_ADDRESS
              )
          except ClientError as e:
              logging.error(e.response['Error']['Message'])
              print("Email sending unsuccessfull")
          else:
              logging.info(f'Notification email sent successfully to {user_email}! Message ID: {ses_response["MessageId"]}')
              print("Email sent successfully")
      
      
      

    Note: The BODY_HTML variables for the email-templates can be anything and change theem as you prefer

    • Zip these two files together and call it access_key_auto_rotation.zip

    Note: This zip file will be defined on the Terraform modulde so this need to be noted down.

  • Now create a Terraform module on the same local folder:

    • main.tf
      # Terraform module to create lambda function to 
      # rotate IAM access keys for AWS users
      #
      # Author       = "Samiul Saki Chowdhury"
      # Copyright   = "Bymiløetaten, Oslo Kommune"
      # Version      = "1.0"
      
      data "aws_iam_policy_document" "lambda_function_assume_role_policy" {
        statement {
          actions = ["sts:AssumeRole"]
      
          principals {
            type        = "Service"
            identifiers = ["lambda.amazonaws.com", "events.amazonaws.com"]
          }
        }
      }
      
      resource "aws_iam_role" "lambda_function_execution_role" {
        name = "StackSet-SecurityFindings-RotationLambdaFunction"
      
        assume_role_policy = data.aws_iam_policy_document.lambda_function_assume_role_policy.json
        managed_policy_arns = [ "arn:${var.aws_partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" ]
        
        inline_policy {
          name = "AllowRotationFunctionPermissions"
      
          policy = jsonencode({
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Action": [
                      "s3:GetObject"
                  ],
                  "Resource": [
                      "arn:${var.aws_partition}:s3:::${var.s3_bucket_name}/*"
                  ],
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "iam:List*",
                      "iam:CreatePolicy",
                      "iam:CreateAccessKey",
                      "iam:DeleteAccessKey",
                      "iam:UpdateAccessKey",
                      "iam:PutUserPolicy",
                      "iam:GetUserPolicy"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "iam:AttachUserPolicy"
                  ],
                  "Resource": [
                      "arn:${var.aws_partition}:iam::${var.aws_accountid}:user/*"
                  ],
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "secretsmanager:PutResourcePolicy",
                      "secretsmanager:PutSecretValue",
                      "secretsmanager:DescribeSecret",
                      "secretsmanager:CreateSecret",
                      "secretsmanager:GetResourcePolicy",
                      "secretsmanager:GetSecretValue"
                  ],
                  "Resource": [
                      "arn:${var.aws_partition}:secretsmanager:${var.aws_region}:${var.aws_accountid}:secret:*"
                  ],
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "ses:SendEmail"
                  ],
                  "Resource": [
                      "arn:${var.aws_partition}:ses:${var.aws_region}:${var.aws_accountid}:identity/*"
                  ],
                  "Effect": "Allow"
              }
            ]
          })
        }
      
        tags = {    
          Env = var.tag_function.Env
          Name = var.tag_function.Name    
        } 
      
      }
      
      
      resource "aws_lambda_function" "iam_rotation_keys" {
        filename          = "access_key_auto_rotation.zip"
        function_name     = "IAM-Access-Key-Rotation-LambdaFunction"
        role              = aws_iam_role.lambda_function_execution_role.arn
        source_code_hash  = filebase64sha256("access_key_auto_rotation.zip")
        runtime           = "python3.8"
        handler           = "main.lambda_handler"
        
        environment {
          variables = {
            RotationPeriod  = var.environment_variables.RotationPeriod
            InactivePeriod  = var.environment_variables.InactivePeriod
            RetentionPeriod = var.environment_variables.RetentionPeriod
            TagKey          = var.environment_variables.TagKey
            UserType        = var.environment_variables.UserType
            AdminEmail      = var.environment_variables.AdminEmail
          }
        
        }
      
        tags = {    
          Env = var.tag_function.Env
          Name = var.tag_function.Name    
        }
      }
      
      
      
      resource "aws_cloudwatch_event_rule" "daily_invocation" {
        name                = "StackSet-SecurityFindings-RotationCloudWatchEvent"
        description         = "CloudWatch Event to trigger Access Key auto-rotation Lambda Function daily"
        schedule_expression = var.schedule
        is_enabled          = "true"
      
        tags = {    
          Env = var.tag_function.Env
          Name = var.tag_function.Name    
        }
      }
      
      
      resource "aws_cloudwatch_event_target" "iam_rotation_daily" {
        rule = aws_cloudwatch_event_rule.daily_invocation.name
        target_id = "lambda_function_execution_role"
        arn = aws_lambda_function.iam_rotation_keys.arn
      }
      
      
      resource "aws_lambda_permission" "allow_cloudwatch_to_call_iam_rotation_keys" {
        statement_id = "AllowExecutionFromCloudWatch"
        action = "lambda:InvokeFunction"
        function_name = aws_lambda_function.iam_rotation_keys.function_name
        principal = "events.amazonaws.com"
        source_arn = aws_cloudwatch_event_rule.daily_invocation.arn
      }
      
      
      
    • provider.tf
      # AWS Provider
      provider "aws" {
        region                  = var.aws_region
        shared_credentials_file = var.shared_credentials_file
        profile                 = var.aws_profile
      }
      
      
      
    • terraform.tfvars
      # Connect to AWS infrastructure
      aws_region          = "eu-west-1"
      aws_profile         = "[AWS_Profil]"
      
      # Tags
      tag_function = {
        Env = "Sandbox"
        Name = "IAM-Rotate-Keys"
      }
      
      
      # Pre-configured Infrastructure
      # For siste versjon vi skal fjerne behov for S3 bucket
      s3_bucket_name      = "iam-rotate-keys" 
      S3BucketPrefix      = "iam-rotation"
      
      aws_partition       = "aws" 
      aws_accountid       = "[AccoutID]" # Ex: 123456789012 
      
      # Environment variables for Lambda functions
      environment_variables = {
        AdminEmail      = "[Sender_E-postadresse]" # Ex: noreply@example.net
        InactivePeriod  = "210"
        RetentionPeriod = "220"
        RotationPeriod  = "180"
        TagKey          = "Email"
        UserType        = "User"
      }
      
      
      # Variables for EventBridge
      schedule = "rate(24 hours)" # Default
      
      
    • variables.tf
      # Connect to AWS infrastructure
      variable "shared_credentials_file" {
        default = "$HOME/.aws/credentials"
      }
      
      variable "aws_region" {
          default = "eu-west-1"
      }
      
      variable "aws_profile" {
          default =  "default"
      }
      
      
      # Tags
      variable "tag_function" {
          type    = object({
              Env     = string
              Name    = string
          })  
      }
      
      
      # Pre-configured Infrastructure
      variable "s3_bucket_name" {
          type = string
          default = "iam-rotate-keys"
      }
      
      variable "S3BucketPrefix" {
          type = string
          default = "iam-rotation"  
      }
      
      variable "aws_partition" {
          default = "aws"  
      }
      
      variable "aws_accountid" {
          type = string 
      }
      
      # Environment variables for Lambda functions
      variable "environment_variables" {
          type        = object({
              RotationPeriod  = string
              InactivePeriod  = string
              RetentionPeriod = string
              TagKey          = string
              UserType        = string
              AdminEmail      = string
        })  
        description = "Environment variables for IAM-rotate-keys lambda function"
      }
      
      # Variables for EventBridge
      variable "schedule" {
        type        = string
        default     = "rate(24 hours)"
        description = "Schedule to trigger lambda function IAM-rotate-keys, default 00:00 daily. See doc: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html"
      }
      
      
      

    Note: Change the variables in terraform.tfvars, variables.tf as preferred

    • Deploy the with Terraform v12:
      terraform init
      terraform plan
      terraform apply
      
      
      
      
      ....
      ....
      Plan: 5 to add, 0 to change, 0 to destroy.
      
      Do you want to perform these actions?
        Terraform will perform the actions described above.
        Only 'yes' will be accepted to approve.
      
        Enter a value: yes
      
        
      
  • Automation is now rolled out using Terraform

An optional automation:

  • We need to make sure that all users have these TagKey in addition (it is important that TagKey matches as written here):
    Key = Email and Value = [iam-user-email]
    Key = UserType and Value = [User / Service]
    

We can automate this with a python script. This script will update all IAM users with tag key Email and UserType for using automation (IAM user Access keys Rotation). We will run them from our local PC with an AWS user profile (which has appropriate access)

  • Install the following libraries for Python 3
    pip3 install pandas boto3
    
  • To run scripts on a local machine (as a one-time process):
    • First create the python script
      main.py
      """
      Automatically updates IAM user tags based on 
      information given in list in CSV files
      
      __author__      = "Samiul Saki Chowdhury"
      """
      
      import csv
      import os
      import pandas
      import boto3
      import math
      import argparse
      
      
      # Variables
      userType = "UserType"
      tagKeyName = "Email"
      
      # csv file name
      filename = "accounts.csv"
      
      # Takes AWS profile as arguments
      parser = argparse.ArgumentParser(description='Script to update IAM user tags.')
      parser.add_argument('--profile', action='store', type=str, help='runs the script for the given profile')
      args = parser.parse_args()
      aws_profile = str(args.profile)
      
      # IAM Client
      # iam = boto3.client('iam')
      iam = boto3.Session(profile_name=aws_profile, region_name="eu-west-1").client("iam")
      
      
      def lambda_handler():
          users = iam.list_users()
          for user in users["Users"]:
              parsed_arn = parse_arn(user["Arn"])
              user_name = user["UserName"]
            
              user_tag = iam.list_user_tags(UserName=user_name)
              print("\nUser>",user_name)
              
              user_type = find_tag_value(user_tag, userType)
              user_email = find_tag_value(user_tag, tagKeyName)
              
              try:
                  account_id,account_name,account_email,account_type = get_info(user["UserName"])
                  print("Data:",account_id, account_name, account_email, account_type)
                  
                  # Untagging process
                  if type(account_type) != str:
                      account_type_nan = math.isnan(account_type)
                      if account_type_nan == True and user_type is not None:
                          untag_iam_user(user_name, userType)                   
                          
                  if type(account_email) != str:
                      account_email_nan = math.isnan(account_email)
                      if account_email_nan == True and user_email is not None:
                          untag_iam_user(user_name, tagKeyName)                        
                  
      
                  # Tagging process
                  if type(account_type) == str and account_type != "" and user_type != account_type:
                      tag_iam_user(user_name, userType, account_type)                
                  if type(account_email) == str and account_email != "" and user_email != account_email:
                      tag_iam_user(user_name, tagKeyName, account_email)                
              except:
                  write_to_file(parsed_arn['account'], user_name, user_email, user_type)
              
                  
              
      
      def get_info(user):
          df = pandas.read_csv(filename)    
          dict_all = df.to_dict(orient='list')
      
          try:
              index_user = dict_all.get("accountname").index(str(user))
              account_id = dict_all.get("accountid")[index_user]
              account_name = dict_all.get("accountname")[index_user]
              account_email = dict_all.get("accountemail")[index_user]
              account_type = dict_all.get("accounttype")[index_user]        
          except ValueError:
              print("No info found in CSV")
          return account_id,account_name,account_email,account_type
      
      def write_to_file(accountId,accountName,accountEmail,accountType):
          f = open('accounts.csv','a')
          if accountEmail == None:
              accountEmail = ""
          if accountType == None:
              accountType = ""
          row = '\n{},{},{},{}'.format(accountId, accountName, accountEmail, accountType)
          f.write(row)
          f.close()
          print("Updated CSV successfully")
      
          
      def tag_iam_user(userName,tagKey,tagValue):    
          iam.tag_user(UserName=str(userName),Tags=[{ 'Key': str(tagKey), 'Value': str(tagValue) }])
          print("Updated with new tag key/value:", tagKey)
      
      def untag_iam_user(userName,unTagKey):
          iam.untag_user(UserName=str(userName),TagKeys=[ str(unTagKey) ])
          print("Untagged tag key:", unTagKey)
      
      def find_tag_value(userTag, tagKey):
          for i in userTag.get("Tags"):
              if i.get("Key") == tagKey:
                  return i.get("Value")
      
          return None
      
      def parse_arn(arn):
          # http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
          elements = arn.split(':', 5)
          result = {
              'arn': elements[0],
              'partition': elements[1],
              'service': elements[2],
              'region': elements[3],
              'account': elements[4],
              'resource': elements[5],
              'resource_type': None
          }
          if '/' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split('/', 1)  # NOQA
          elif ':' in result['resource']:
              result['resource_type'], result['resource'] = result['resource'].split(':', 1)  # NOQA
          return result
      
      
      lambda_handler()
      
      
      
    • We are going to create and fill in the accounts.csv file with all the available information for all users in an AWS IAM account. If you already have an overview of all user information, you can already update the csv file by opening it with notepad or vscode. The CSV file should look like (with headers on top that should not be removed / edited):
      accountid,accountname,accountemail,accounttype
      123456789012,GITHUB_User,,Service
      123456789012,Grafana_User,,Service
      123456789012,dev-user,dev.user@whatever.com,User
      123456789012,test-user,test.user@whatever.com,User
      ....
      ....
      123456789012,samiul,samiul.saki@example.net,User
      ....
      
    • Otherwise just run the main.py script to fill in the csv file automatically. You must add AWS profile name as an argument to the script:
      python3 main.py --profile <aws_profile>
      
      • Read help text if you need help
      > python3 main.py -h
      
      usage: main.py [-h] [--profile PROFILE]
      
      Script to update IAM user tags.
      
      optional arguments:
        -h, --help         show this help message and exit
        --profile PROFILE  runs the script for the given profile
      
    • You now have a list of all user information. Edit the rows based on information you want to update.

    Note: This copy of the csv file has first priority over whichever information already exists in AWS. This means that you have full control over user information in the list displayed in the CSV file.

    • To update the CSV file (in this order, separated by commas):
      • accountid = Account ID for AWS account where you want to update the user information
      • accountname = Username of the user
      • accountemail = The e-mail address of the IAM user (to be used for iam-access-key-rotation script to send out notification / information of access keys rotation), possibly there will be tag value for tagkey Email
      • accounttype = Type of IAM user, possibly tag value for tagkey UserType.

      Important: Fill in the UserType column for users with either with User / Service as you wish. Remember that users with User UserType should have an email address as well, but it is not necessarily.

    • Run main.py again.
      python3 main.py --profile <aws_profile>
      
    • The script will update the user information in the IAM according to the information contained in the csv file.

You’re now done.

  • To update any other IAM users in another AWS account:

    • Remove all users from list (except headers) in the CSV file
    • Change the aws_profile variable to the new aws profile in your local machine in the main.py script
    • Run main.py. Same as before, fill in the csv file at first run, then update and run the script again to roll the update for IAM users in the AWS account.
  • To edit an IAM user information:

    • To change user information, edit that CSV file and run main.py again
      Example:
      accountid,accountname,accountemail,accounttype
      123456789012,dev-user,dev.user@whatever.com,User # Here I have changed the email address with @whatever.com
      
  • To remove (Untag) a Tagkey for IAM users:

    • Simply edit the CSV file. With same example_
      Example:
      accountid,accountname,accountemail,accounttype
      123456789012,dev-user,,User # Here I have removed the email address of the user
      
    • Run main.py again.
    • The script should remove the user’s tagkey Email, UserType in IAM since it is an Empty string.
      Example:
      accountid,accountname,accountemail,accounttype
      123456789012,dev-user,,User # Will remove Email tagkey
      123456789012,dev-user,, # Will remove Email and UserType tagkey