My Journey to the cloud…

In pursuit of excellence….


Encrypting Your Unencrypted RDS Instance with Python

Importance of Encryption

Encrypting your RDS instance is a crucial security measure to protect your data. Unencrypted databases are vulnerable to unauthorized access if compromised. Encryption applies AES-256 encryption to your DB instance and snapshots, securing it from unauthorized access and meeting compliance requirements. Enabling encryption best aligns with AWS security best practices for data protection.

Python Script for Encrypting RDS

Below is a simple Python script using Boto3 to encrypt an existing unencrypted RDS instance. It:

– Imports necessary libraries

– Creates an RDS client

– Gets the DB instance

– Creates the snapshot of the instance

– Creates encrypted snapshot from unencrypted copy using KMS Key

– Creates RDS instance from encrypted snapshot

– cleans old snapshot

– cleans old instance

import boto3
import sys
import os
import time
import argparse
import logging
import botocore
from datetime import datetime, timedelta, timezone

DEFAULT_AWS_Account_ID=1111111111
DEFAULT_REGION="us-east-1"
#date_time_now = datetime.now().strftime('%Y/%m/%d  %H:%M:%S')
date_time_now = datetime.now().strftime('%Y%m%d')


""" Below piece of code is added to redirect the output to stdout as well as logfile """
class Logger(object):
    def __init__(self):
        self.terminal = sys.stdout
        self.log = open("/tmp/encryptRDS.log", "a")

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)

    def flush(self):
        #this flush method is needed for python 3 compatibility.
        #this handles the flush command by doing nothing.
        #you might want to specify some extra behavior here.
        pass

sys.stdout = Logger()
""" logger code Ends """


def parse_commandline_arguments():

    global REGION
    global AWS_Account_ID
    global KMS_key_ID
    global dbname

    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     description='Script to encrypt the existing unencrypted RDS Instance.')
    parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID,
                        help="The AWS Account ID where volume tagging is  to be done")
    parser.add_argument("-r", "--region", dest="region", type=str,
                        default=DEFAULT_REGION, help="Specify the region of the AWS Account")
    parser.add_argument("-kms", "--kmskeyID", dest="kmsid", type=str,
                        help="Specify the KMS Key ID for RDS Encryption")
    parser.add_argument("-db", "--dbname", dest="db_name", type=str,
                        help="Specify the name of RDS")

    args = parser.parse_args()
    REGION = args.region
    AWS_Account_ID = args.aws_ID
    KMS_key_ID = args.kmsid
    dbname = args.db_name

#db_instance = 'publichem'


def rds_client(region):
    """
    Connects to RDS, returns a connection object
    """

    try:
        conn = boto3.client('rds', region_name=region)

    except Exception as e:
        sys.stderr.write(
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn


def rds_encryption(client,dbname,KMS_key_ID):
    unencrypted_instances = []
    try:
        instances = client.describe_db_instances(DBInstanceIdentifier=dbname)
        for instance in instances['DBInstances']:
            if instance['StorageEncrypted'] == False:
                unencrypted_instances.append(instance)
                #print('Starting RDS encryption process...\n')
                print('<font size=1 face=verdana color=blue>Starting RDS encryption process...\n</font><br>')
                #print('Creating snapshot for: %s' % instance['DBInstanceIdentifier'])
                print('<font size=1 face=verdana color=blue>Creating snapshot for: <b><font size=1 color=red>({})</font></b></font><br>'.format(instance['DBInstanceIdentifier']))

                client.create_db_snapshot(
                        DBSnapshotIdentifier='old-unencrypted-' + instance['DBInstanceIdentifier'],
                        DBInstanceIdentifier=instance['DBInstanceIdentifier']
                    )

                client.get_waiter('db_snapshot_available').wait(
                        DBSnapshotIdentifier='old-unencrypted-' + instance['DBInstanceIdentifier'],
                        DBInstanceIdentifier=instance['DBInstanceIdentifier']
                    )

                #print('Creating encrypted snapshot from unencrypted copy')
                print('<font size=1 face=verdana color=blue>Creating encrypted snapshot from unencrypted copy</font><br>')

                client.copy_db_snapshot(
                        SourceDBSnapshotIdentifier='old-unencrypted-' + instance['DBInstanceIdentifier'],
                        TargetDBSnapshotIdentifier='new-encrypted-' + instance['DBInstanceIdentifier'],
                        KmsKeyId=KMS_key_ID,
                        CopyTags=True
                    )

                client.get_waiter('db_snapshot_available').wait(
                        DBSnapshotIdentifier='new-encrypted-' + instance['DBInstanceIdentifier'],
                        DBInstanceIdentifier=instance['DBInstanceIdentifier']
                    )

                #print('Creating instance from encrypted snapshot')
                print('<font size=1 face=verdana color=blue>Creating instance from encrypted snapshot</font><br>')

                client.restore_db_instance_from_db_snapshot(
                        DBInstanceIdentifier='encrypted-' + instance['DBInstanceIdentifier'],
                        DBSnapshotIdentifier='new-encrypted-' + instance['DBInstanceIdentifier'],
                        DBSubnetGroupName=instance['DBSubnetGroup']['DBSubnetGroupName'] # Madndatory parameter if there is no Default VPC
                    )

                client.get_waiter('db_instance_available').wait(
                        DBInstanceIdentifier='encrypted-' + instance['DBInstanceIdentifier']
                    )

                #print('Cleaning up old snapshots...')
                print('<font size=1 face=verdana color=blue>Cleaning up old snapshots...</font><br>')

                client.delete_db_snapshot(
                        DBSnapshotIdentifier='old-unencrypted-' + instance['DBInstanceIdentifier']
                        )

                client.get_waiter('db_snapshot_deleted').wait(
                        DBSnapshotIdentifier='old-unencrypted-' + instance['DBInstanceIdentifier'],
                        WaiterConfig={
                            'Delay': 15,
                            'MaxAttempts': 30
                            }
                        )
                client.delete_db_snapshot(
                        DBSnapshotIdentifier='new-encrypted-' + instance['DBInstanceIdentifier']
                        )

                client.get_waiter('db_snapshot_deleted').wait(
                        DBSnapshotIdentifier='new-encrypted-' + instance['DBInstanceIdentifier'],
                        WaiterConfig={
                            'Delay': 5,
                            'MaxAttempts': 30
                            }
                        )

                # Deleting original instance prior to creating a final snapshot to restore from if needed
                #print('Cleaning up old instances...')
                print('<font size=1 face=verdana color=blue>Cleaning up old instances...</font><br>')
                #identifier=instance['DBInstanceIdentifier']+"-"+date_time_now
                #print(instance['DBInstanceIdentifier']+"-"+date_time_now)
                client.delete_db_instance(
                        DBInstanceIdentifier=instance['DBInstanceIdentifier'],
                        SkipFinalSnapshot=False,
                        FinalDBSnapshotIdentifier=instance['DBInstanceIdentifier']+"-"+date_time_now
                        )

                client.get_waiter('db_instance_deleted').wait(
                        DBInstanceIdentifier=instance['DBInstanceIdentifier'],
                        )

                #print('Renaming new encrypted instance with original instance name')
                print('<font size=1 face=verdana color=blue>Renaming new encrypted instance with original instance name</font><br>')
                client.modify_db_instance(
                        DBInstanceIdentifier='encrypted-' + instance['DBInstanceIdentifier'],
                        ApplyImmediately=True,
                        NewDBInstanceIdentifier=instance['DBInstanceIdentifier']
                        )

                time.sleep(60)
                client.get_waiter('db_instance_available').wait(
                        DBInstanceIdentifier=instance['DBInstanceIdentifier']
                        )

                #print('RDS encryption process complete for: %s!\n' % instance['DBInstanceIdentifier'])
                print('<font size=1 face=verdana color=blue>RDS encryption process complete for: <b><font size=1 color=red>({})</font></b></font><br>\n'.format(instance['DBInstanceIdentifier']))

            else:
                #print("\nGiven RDS - {} is already encrypted".format(dbname))
                print("\n<font size=1 face=verdana color=blue>Given RDS - <b><font size=1 color=red>({})</font></b></font><br>".format(dbname))
    #    print('\nDetected %d unencrypted RDS instances!' % len(unencrypted_instances))
        exit(0)
    except botocore.exceptions.ClientError as e:
        print(e.response['Error']['Code'])
        print(e)
        exit(1)



if __name__ == '__main__':
    parse_commandline_arguments()
    client=rds_client(REGION)
    rds_encryption(client,dbname,KMS_key_ID)

This enables encryption quickly and easily for an RDS instance launched without encryption.

Call to Action

Safeguard your data today by encrypting your RDS instances. Use this Python script to enable encryption and align with security best practices. Act now to protect your databases!



Leave a comment

About Me

I’m a Hands-On Technical & Entrprise Solutions Architect based out of Houston, TX. I have been working on Oracle ERP, Oracle Database and Cloud technologies for over 20 years and still going strong for learning new things.

You can connect me on Linkedin and also reach out to me

I am certified for 8x AWS, OCP (Oracle Certified Professionals), PMP, ITTL and 6 Sigma.

Disclaimer

This is a personal blog. Any views or opinions represented in this blog are personal and belong solely to the blog owner and do not represent those of people, institutions or organizations that the owner may or may not be associated with in professional or personal capacity, unless explicitly stated.
All content provided on this blog is for informational purposes only. The owner of this blog makes no representations as to the accuracy or completeness of any information on this site or found by following any link on this site.

The owner will not be liable for any errors or omissions in this information nor for the availability of this information. The owner will not be liable for any losses, injuries, or damages from the display or use of this information. Any script available on the blog post MUST be tested before they are run against Production environment.

Newsletter