Many times we come across a situation where S3 Bucket access logging is not default and due to corporate security policy, such buckets are flagged a Security incident. Hence there was a need to enable the sever access logging programmatically due to very large number of such S3 Buckets.
Recently I developed a script using boto3 to achieve the task. This helped to enable the logging for 100+ such buckets in ~30 min. Also, I configured a job in Jenkins so that job can be accomplished by L1 support team.
Script Name – EnableS3BucketLogging.py
#!/usr/bin/env python import boto3 import time import sys import logging import datetime import argparse import csv import os from botocore.exceptions import ClientError print ("S3 Listing at %s" % time.ctime()) DEFAULT_BUCKET = "ALL" DEFAULT_REGION = "us-east-1" DEFAULT_AWS_Account_ID = "1234567899765" DEFAULT_AWS_Account_Name = "Dummy Account Name" def parse_commandline_arguments(): global REGION global AWS_Account_ID global AWS_Account_Name global BUCKET_NAME global target_bucket parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description='Enable S3 Server Logging if Not enabled.') parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID, help="The AWS Account ID where volume tagging is to be done") parser.add_argument("-r", "--region", dest="region", type=str, default=DEFAULT_REGION, help="Specify the region of the AWS Account") parser.add_argument("-b", "--bucket", dest="bucket", type=str, default=DEFAULT_BUCKET, help="Specify the bucket name") parser.add_argument("-accountName","--AWSAccountName",dest="aws_account_name",type=str, default=DEFAULT_AWS_Account_Name, help="Specify the AWS Account Name") args = parser.parse_args() REGION = args.region AWS_Account_ID = args.aws_ID BUCKET_NAME = args.bucket AWS_Account_Name = args.aws_account_name def s3_resource(region): # Connects to EC2, returns a connection object try: conn = boto3.resource('s3', region_name=region) except Exception as e: sys.stderr.write( 'Could not connect to region: %s. Exception: %s\n' % (region, e)) conn = None return conn def s3_client(region): """ Connects to EC2, returns a connection object """ try: conn = boto3.client('s3', region) except Exception as e: sys.stderr.write( 'Could not connect to region: %s. Exception: %s\n' % (region, e)) conn = None return conn def grantaclBucket(s3_client,sourcebucket,targetbucket): try: acl = s3_client.get_bucket_acl(Bucket = sourcebucket) for d in acl['Grants']: if 'ID' in d['Grantee']: # If Grantee is NOT URI, then specific Grant needs to be given before enabling Logging canonical_id = d['Grantee']['ID'] response = s3.put_bucket_acl( AccessControlPolicy={ 'Grants': [ { 'Grantee': { 'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery' }, 'Permission': 'READ_ACP' }, { 'Grantee': { 'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery' }, 'Permission': 'WRITE' } ], 'Owner': { 'ID': canonical_id }, }, Bucket=targetbucket ) elif 'URI' in d['Grantee']: # If Grant is already given to URL, no need of explicit Grant print("Log Delivery Group has the required permission...") return True except Exception as error: logging.error(e) return None def enableAccessLogging(clientS3, sourcebucket, targetbucket,targetPrefix): try: response = clientS3.put_bucket_logging( Bucket=sourcebucket, BucketLoggingStatus={ 'LoggingEnabled': { 'TargetBucket': targetbucket, 'TargetPrefix': targetPrefix } }, ) return True except ClientError as e: logging.error(e) return None def showSingleBucket(bucketName,s3,s3bucket,targetPrefix): "Displays the contents of a single bucket" if ( len(bucketName) == 0 ): print ("bucket name not provided, listing all buckets....") time.sleep(8) else: print ("Bucket Name provided is: %s" % bucketName) #s3bucket = boto3.resource('s3') my_bucket = s3bucket.Bucket(bucketName) bucket_logging = s3bucket.BucketLogging(bucketName) bucket_logging_response = bucket_logging.logging_enabled if bucket_logging.logging_enabled is None: print("Bucket - {} is not loggging Enabled" .format(bucketName)) print("Bucket - {} logging is in progress..." .format(bucketName)) grantaclBucket(s3,bucketName,bucketName) # Grant ACL to Log Delivery Group - mandatory before enabling logging enableAccessLogging(s3, bucketName, bucketName,targetPrefix) # Enable Bucket Logging else: print("Bucket - {} Logging is already enabled." .format(bucketName)) print("Target Bucket is - {}" .format(bucket_logging_response['TargetBucket'])) print("Target prefix is - {}" .format(bucket_logging_response['TargetPrefix'])) #for object in my_bucket.objects.all(): # print(object.key) return def showAllBuckets(s3,s3bucket,targetPrefix): try: response = s3.list_buckets() for bucket in response['Buckets']: my_bucket = bucket['Name'] bucket_logging = s3bucket.BucketLogging(my_bucket) bucket_logging_response = bucket_logging.logging_enabled if bucket_logging.logging_enabled is None: print("Bucket - {} is not loggging Enabled" .format(my_bucket)) print("Bucket - {} logging is in progress..." .format(my_bucket)) grantaclBucket(s3,my_bucket,my_bucket) # Grant ACL to Log Delivery Group enableAccessLogging(s3,my_bucket,my_bucket,targetPrefix) # Enable Bucket Logging else: print("Bucket - {} Logging is already enabled." .format(my_bucket)) target_bucket = bucket_logging_response['TargetBucket'] target_prefix = bucket_logging_response['TargetPrefix'] except ClientError as e: print("The bucket does not exist, choose how to deal with it or raise the exception: "+e) return if __name__ == '__main__': try: parse_commandline_arguments() targetPrefix = 'S3_Access_logs/' s3_client_conn = s3_client(REGION) s3_resource_conn = s3_resource(REGION) print("<font size=1 face=verdana color=blue>Processing for AWS Account :- <b><font size=1 color=red> {}</font></b></font><br>".format(AWS_Account_ID)) print( "<font size=1 face=verdana color=blue>==============================</font><br><br>") if BUCKET_NAME == "ALL": showAllBuckets(s3_client_conn,s3_resource_conn,targetPrefix) else: showSingleBucket(BUCKET_NAME,s3_client_conn,s3_resource_conn,targetPrefix) except Exception as error: logging.error(e) print(str(error)) print("Issue while enabling Server Access Logging")
This python script is being called from Shell script – where the environment is set using “AssumeRole” funciton.
Shell Script Name – EnableS3BucketLogging.py
#!/bin/sh if [[ $# -lt 2 ]]; then echo "Usage: ${0} <AccountID> <Bucket Name>" exit 1 fi AccountID=${1} BucketName=${2} script_top=/u01/app/scripts outputdir=${script_top}/output logfile=${script_top}/logs/EnableS3BucketLogging.log cat /dev/null > ${logfile} unset AWS_SESSION_TOKEN AWS_DEFAULT_REGION AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID . /u01/app/scripts/bin/AssumeRole.sh ${AccountID} # No need to set Region as Buckets are Global echo "python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}" python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}
Hope this helps. Happy reading !!!
~Anand M
Leave a Reply