Archive
Boto3 script to delete existing VPC Interface Endpoints from a given AWS Account
Recently developed a script using Boto3 and Python to delete specific VPC Interface Endpoints. These endpoints were deployed as part of landing zone resources but are not being used currently. Such resources incur cost and hence if not used, it is good to remove them to save some cost.
Intent is to call this script from some DevOps tool (like Ansible or Jenkins) to complete automate the task.
#!/usr/bin/env python import boto3 import logging import os.path import time import argparse output_dir = "/tmp" DEFAULT_AWS_Account_ID = "1111222222" DEFAULT_REGION = "us-east-1" client = boto3.client('ec2') logger = logging.getLogger() logger.setLevel(logging.INFO) # create console handler and set level to info handler = logging.StreamHandler() handler.setLevel(logging.INFO) logger.addHandler(handler) # create file handler and set level to Info # this is to help Output directed to both - console # and file handler = logging.FileHandler(os.path.join(output_dir, "vpcendpointdelete.log"),"w", encoding=None, delay="true") handler.setLevel(logging.INFO) logger.addHandler(handler) def parse_commandline_arguments(): global REGION global AWS_Account_ID parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Boto3 script to delete VPC Interface Endppints from a given AWS Account.') parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID, help="The AWS Account ID where VPC Endpoint is to be deleted") parser.add_argument("-r", "--region", dest="region", type=str, default=DEFAULT_REGION, help="Specify the region of the AWS Account") args = parser.parse_args() REGION = args.region AWS_Account_ID = args.aws_ID def remove_vpcendpoint(region): if region == "us-east-2": filters = [{'Name': 'service-name', 'Values': ['com.amazonaws.us-east-2.ec2','com.amazonaws.us-east-2.ec2messages','com.amazonaws.us-east-2.ssm','com.amazonaws.us-east-2.ssmmessages','com.amazonaws.us-east-2.monitoring'] }, {'Name': 'vpc-endpoint-type', 'Values' : ['Interface']}] if region == "us-east-1": filters = [{'Name': 'service-name', 'Values': ['com.amazonaws.us-east-1.ec2','com.amazonaws.us-east-1.ec2messages','com.amazonaws.us-east-1.ssm','com.amazonaws.us-east-1.ssmmessages','com.amazonaws.us-east-1.monitoring'] }, {'Name': 'vpc-endpoint-type', 'Values' : ['Interface']}] response = client.describe_vpc_endpoints(Filters=filters) for services in response['VpcEndpoints']: logger.info("Deleting VpcEndpoint ID : {} - Service Name : {}".format(services['VpcEndpointId'],services['ServiceName'])) for attempt in range(5): try: client.delete_vpc_endpoints( VpcEndpointIds=[services['VpcEndpointId']] ) except BaseException as err: logger.error(err) logger.info("*** ERROR *** during VPC Interface Endpoint delete - retry...") time.sleep(0.5) else: logger.info("--> Done") break; else: logger.error("*** ERROR *** - All attempt to delete VPC Interface Endpoint failed - exit with error") raise Exception("*** ERROR *** - Can't delete VPC Interface Endpoint") if __name__ == '__main__': try: parse_commandline_arguments() remove_vpcendpoint(REGION) except Exception as error: logging.error(error) print(str(error))
Enjoy reading !!!
Anand M
Script to Enable AWS S3 Server Access Logging using Boto3
Many times we come across a situation where S3 Bucket access logging is not default and due to corporate security policy, such buckets are flagged a Security incident. Hence there was a need to enable the sever access logging programmatically due to very large number of such S3 Buckets.
Recently I developed a script using boto3 to achieve the task. This helped to enable the logging for 100+ such buckets in ~30 min. Also, I configured a job in Jenkins so that job can be accomplished by L1 support team.
Script Name – EnableS3BucketLogging.py
#!/usr/bin/env python import boto3 import time import sys import logging import datetime import argparse import csv import os from botocore.exceptions import ClientError print ("S3 Listing at %s" % time.ctime()) DEFAULT_BUCKET = "ALL" DEFAULT_REGION = "us-east-1" DEFAULT_AWS_Account_ID = "1234567899765" DEFAULT_AWS_Account_Name = "Dummy Account Name" def parse_commandline_arguments(): global REGION global AWS_Account_ID global AWS_Account_Name global BUCKET_NAME global target_bucket parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Enable S3 Server Logging if Not enabled.') parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID, help="The AWS Account ID where volume tagging is to be done") parser.add_argument("-r", "--region", dest="region", type=str, default=DEFAULT_REGION, help="Specify the region of the AWS Account") parser.add_argument("-b", "--bucket", dest="bucket", type=str, default=DEFAULT_BUCKET, help="Specify the bucket name") parser.add_argument("-accountName","--AWSAccountName",dest="aws_account_name",type=str, default=DEFAULT_AWS_Account_Name, help="Specify the AWS Account Name") args = parser.parse_args() REGION = args.region AWS_Account_ID = args.aws_ID BUCKET_NAME = args.bucket AWS_Account_Name = args.aws_account_name def s3_resource(region): # Connects to EC2, returns a connection object try: conn = boto3.resource('s3', region_name=region) except Exception as e: sys.stderr.write( 'Could not connect to region: %s. Exception: %s\n' % (region, e)) conn = None return conn def s3_client(region): """ Connects to EC2, returns a connection object """ try: conn = boto3.client('s3', region) except Exception as e: sys.stderr.write( 'Could not connect to region: %s. Exception: %s\n' % (region, e)) conn = None return conn def grantaclBucket(s3_client,sourcebucket,targetbucket): try: acl = s3_client.get_bucket_acl(Bucket = sourcebucket) for d in acl['Grants']: if 'ID' in d['Grantee']: # If Grantee is NOT URI, then specific Grant needs to be given before enabling Logging canonical_id = d['Grantee']['ID'] response = s3.put_bucket_acl( AccessControlPolicy={ 'Grants': [ { 'Grantee': { 'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery' }, 'Permission': 'READ_ACP' }, { 'Grantee': { 'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery' }, 'Permission': 'WRITE' } ], 'Owner': { 'ID': canonical_id }, }, Bucket=targetbucket ) elif 'URI' in d['Grantee']: # If Grant is already given to URL, no need of explicit Grant print("Log Delivery Group has the required permission...") return True except Exception as error: logging.error(e) return None def enableAccessLogging(clientS3, sourcebucket, targetbucket,targetPrefix): try: response = clientS3.put_bucket_logging( Bucket=sourcebucket, BucketLoggingStatus={ 'LoggingEnabled': { 'TargetBucket': targetbucket, 'TargetPrefix': targetPrefix } }, ) return True except ClientError as e: logging.error(e) return None def showSingleBucket(bucketName,s3,s3bucket,targetPrefix): "Displays the contents of a single bucket" if ( len(bucketName) == 0 ): print ("bucket name not provided, listing all buckets....") time.sleep(8) else: print ("Bucket Name provided is: %s" % bucketName) #s3bucket = boto3.resource('s3') my_bucket = s3bucket.Bucket(bucketName) bucket_logging = s3bucket.BucketLogging(bucketName) bucket_logging_response = bucket_logging.logging_enabled if bucket_logging.logging_enabled is None: print("Bucket - {} is not loggging Enabled" .format(bucketName)) print("Bucket - {} logging is in progress..." .format(bucketName)) grantaclBucket(s3,bucketName,bucketName) # Grant ACL to Log Delivery Group - mandatory before enabling logging enableAccessLogging(s3, bucketName, bucketName,targetPrefix) # Enable Bucket Logging else: print("Bucket - {} Logging is already enabled." .format(bucketName)) print("Target Bucket is - {}" .format(bucket_logging_response['TargetBucket'])) print("Target prefix is - {}" .format(bucket_logging_response['TargetPrefix'])) #for object in my_bucket.objects.all(): # print(object.key) return def showAllBuckets(s3,s3bucket,targetPrefix): try: response = s3.list_buckets() for bucket in response['Buckets']: my_bucket = bucket['Name'] bucket_logging = s3bucket.BucketLogging(my_bucket) bucket_logging_response = bucket_logging.logging_enabled if bucket_logging.logging_enabled is None: print("Bucket - {} is not loggging Enabled" .format(my_bucket)) print("Bucket - {} logging is in progress..." .format(my_bucket)) grantaclBucket(s3,my_bucket,my_bucket) # Grant ACL to Log Delivery Group enableAccessLogging(s3,my_bucket,my_bucket,targetPrefix) # Enable Bucket Logging else: print("Bucket - {} Logging is already enabled." .format(my_bucket)) target_bucket = bucket_logging_response['TargetBucket'] target_prefix = bucket_logging_response['TargetPrefix'] except ClientError as e: print("The bucket does not exist, choose how to deal with it or raise the exception: "+e) return if __name__ == '__main__': try: parse_commandline_arguments() targetPrefix = 'S3_Access_logs/' s3_client_conn = s3_client(REGION) s3_resource_conn = s3_resource(REGION) print("<font size=1 face=verdana color=blue>Processing for AWS Account :- <b><font size=1 color=red> {}</font></b></font><br>".format(AWS_Account_ID)) print( "<font size=1 face=verdana color=blue>==============================</font><br><br>") if BUCKET_NAME == "ALL": showAllBuckets(s3_client_conn,s3_resource_conn,targetPrefix) else: showSingleBucket(BUCKET_NAME,s3_client_conn,s3_resource_conn,targetPrefix) except Exception as error: logging.error(e) print(str(error)) print("Issue while enabling Server Access Logging")
This python script is being called from Shell script – where the environment is set using “AssumeRole” funciton.
Shell Script Name – EnableS3BucketLogging.py
#!/bin/sh if [[ $# -lt 2 ]]; then echo "Usage: ${0} <AccountID> <Bucket Name>" exit 1 fi AccountID=${1} BucketName=${2} script_top=/u01/app/scripts outputdir=${script_top}/output logfile=${script_top}/logs/EnableS3BucketLogging.log cat /dev/null > ${logfile} unset AWS_SESSION_TOKEN AWS_DEFAULT_REGION AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID . /u01/app/scripts/bin/AssumeRole.sh ${AccountID} # No need to set Region as Buckets are Global echo "python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}" python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}
Hope this helps. Happy reading !!!
~Anand M
Script to generate CSV for Compute Optimizer data from a Json file
Below is the script to generate a CSV file from a JSON output. I wrote this script for generating CSV for collecting compute optimizer data so that each EC2 has one line of data in the CSV file. Later on this CSV file is uploaded to google sheet for further analysis.
Python script “reportComputeOptData.py” is called within shell script “reportComputeOptData.sh”.
Python Script
import sys import json import pandas as pd ## Env is set for proper console display pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1000) ## Env Setting - Ends jsonfile = str(sys.argv[1]) csvfile = str(sys.argv[2]) with open(jsonfile) as file: data = json.load(file) df = pd.DataFrame(data['instanceRecommendations']) for i,item in enumerate(df['utilizationMetrics']): for k in range(len(df['utilizationMetrics'][i])): #Add a new column with a default value and then add/update the value of that colm df.at[i,'utilizationMetrics_name_{}'.format(k)] = dict(df['utilizationMetrics'][i][k])['name'] df.at[i,'utilizationMetrics_statistic_{}'.format(k)] = dict(df['utilizationMetrics'][i][k])['statistic'] df.at[i,'utilizationMetrics_value_{}'.format(k)] = dict(df['utilizationMetrics'][i][k])['value'] for m in range(len(df['recommendationOptions'][i])): df.at[i,'recommendationOptions_instanceType_{}'.format(m)] = dict(df['recommendationOptions'][i][m])['instanceType'] df.at[i,'recommendationOptions_performanceRisk_{}'.format(m)] = dict(df['recommendationOptions'][i][m])['performanceRisk'] df.at[i,'recommendationOptions_rank_{}'.format(m)] = dict(df['recommendationOptions'][i][m])['rank'] for j in range(len(dict(df['recommendationOptions'][i][m])['projectedUtilizationMetrics'])): df.at[i,'reco_projectedUtilizationMetrics_{}_name_{}'.format(m,j)] = dict(dict(df['recommendationOptions'][i][m])['projectedUtilizationMetrics'][j])['name'] df.at[i,'reco_projectedUtilizationMetrics_{}_statistic_{}'.format(m,j)] = dict(dict(df['recommendationOptions'][i][m])['projectedUtilizationMetrics'][j])['statistic'] df.at[i,'reco_projectedUtilizationMetrics_{}_value_{}'.format(m,j)] = dict(dict(df['recommendationOptions'][i][m])['projectedUtilizationMetrics'][j])['value'] df = df.drop({'utilizationMetrics','recommendationOptions'}, axis=1) df.to_csv(csvfile, header=True,index=False) print("CSV File generated at..- {}".format(csvfile))
Shell Script (which generates the json file which then parsed to python script to generate the CSV file)
#!/bin/sh if [[ $# -lt 1 ]]; then echo "Usage: ${0} <AccountID> [<Region>]" exit 1 fi NOW=$(date +"%m%d%Y%H%M") AccountID=${1} AWS_DEFAULT_REGION=${2} ## 3rd Argument is the Account Default Region is diff than the CLI server script_top=/d01/app/aws_script/bin outputdir=/d01/app/aws_script/output csvfile=${outputdir}/${AccountID}_copt-${NOW}.csv jsonfile=${outputdir}/${AccountID}_copt-${NOW}.json # ## Reset Env variables reset_env () { unset AWS_SESSION_TOKEN unset AWS_DEFAULT_REGION unset AWS_SECRET_ACCESS_KEY unset AWS_ACCESS_KEY_ID } #end of reset_env ## Set Env function assume_role () { AccountID=${1} source </path_to_source_env_file/filename> ${AccontID} } # Function assume_role ends assume_role ${AccountID} if [[ ! -z "$2" ]]; then AWS_DEFAULT_REGION='us-east-2' fi # ## Generate json file aws compute-optimizer get-ec2-instance-recommendations | jq -r . >${jsonfile} ## Pass the json file to python script along with the CSV File for the output python ${script_top}/reportComputeOptData.py ${jsonfile} ${csvfile} echo "CSV File generated... - ${csvfile}" reset_env
Json file format
{
“instanceRecommendations”: [
{
“instanceArn”: “arn:aws:ec2:eu-east-1:123404238928:instance/i-04a67rqw6c029b82f”,
“accountId”: “123404238928”,
“instanceName”: “testserver01”,
“currentInstanceType”: “c4.xlarge”,
“finding”: “OVER_PROVISIONED”,
“utilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 6.3559322033898304
}
],
“lookBackPeriodInDays”: 14,
“recommendationOptions”: [
{
“instanceType”: “t3.large”,
“projectedUtilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 12.711864406779661
}
],
“performanceRisk”: 3,
“rank”: 1
},
{
“instanceType”: “m5.large”,
“projectedUtilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 12.711864406779661
}
],
“performanceRisk”: 1,
“rank”: 2
},
{
“instanceType”: “m4.large”,
“projectedUtilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 15.645371577574968
}
],
“performanceRisk”: 1,
“rank”: 3
}
],
“recommendationSources”: [
{
“recommendationSourceArn”: “arn:aws:ec2:eu-east-1:123404238928:instance/i-04a67rqw6c029b82f”,
“recommendationSourceType”: “Ec2Instance”
}
],
“lastRefreshTimestamp”: 1583986171.637
},
{
“instanceArn”: “arn:aws:ec2:eu-east-1:123404238928:instance/i-0af6a6b96e2690002”,
“accountId”: “123404238928”,
“instanceName”: “TestServer02”,
“currentInstanceType”: “t2.micro”,
“finding”: “OPTIMIZED”,
“utilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 96.27118644067791
}
],
“lookBackPeriodInDays”: 14,
“recommendationOptions”: [
{
“instanceType”: “t3.micro”,
“projectedUtilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 39.1101694915254
}
],
“performanceRisk”: 1,
“rank”: 1
},
{
“instanceType”: “t2.micro”,
“projectedUtilizationMetrics”: [
{
“name”: “CPU”,
“statistic”: “MAXIMUM”,
“value”: 96.27118644067791
}
],
“performanceRisk”: 1,
“rank”: 2
}
],
“recommendationSources”: [
{
“recommendationSourceArn”: “arn:aws:ec2:eu-east-1:123404238928:instance/i-0af6a6b96e2690002”,
“recommendationSourceType”: “Ec2Instance”
}
],
“lastRefreshTimestamp”: 1583986172.297
}
],
“errors”: []
}
Enjoy reading !!!
Anand M