Script to Enable AWS S3 Server Access Logging using Boto3

Many times we come across a situation where S3 Bucket access logging is not default and due to corporate security policy, such buckets are flagged a Security incident. Hence there was a need to enable the sever access logging programmatically due to very large number of such S3 Buckets.
Recently I developed a script using boto3 to achieve the task. This helped to enable the logging for 100+ such buckets in ~30 min. Also, I configured a job in Jenkins so that job can be accomplished by L1 support team.

Script Name –

#!/usr/bin/env python

import boto3
import time
import sys
import logging
import datetime
import argparse
import csv
import os
from botocore.exceptions import ClientError

print ("S3 Listing at %s" % time.ctime())

DEFAULT_REGION = "us-east-1"
DEFAULT_AWS_Account_ID = "1234567899765"
DEFAULT_AWS_Account_Name = "Dummy Account Name"

def parse_commandline_arguments():

    global REGION
    global AWS_Account_ID
    global AWS_Account_Name
    global BUCKET_NAME
    global target_bucket

    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     description='Enable S3 Server Logging if Not enabled.')
    parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID,
                        help="The AWS Account ID where volume tagging is  to be done")
    parser.add_argument("-r", "--region", dest="region", type=str,
                        default=DEFAULT_REGION, help="Specify the region of the AWS Account")
    parser.add_argument("-b", "--bucket", dest="bucket", type=str,
                        default=DEFAULT_BUCKET, help="Specify the bucket name")
    parser.add_argument("-accountName","--AWSAccountName",dest="aws_account_name",type=str, default=DEFAULT_AWS_Account_Name,
                        help="Specify the AWS Account Name")

    args = parser.parse_args()
    REGION = args.region
    AWS_Account_ID = args.aws_ID
    BUCKET_NAME = args.bucket
    AWS_Account_Name = args.aws_account_name

def s3_resource(region):

    # Connects to EC2, returns a connection object
        conn = boto3.resource('s3', region_name=region)

    except Exception as e:
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn

def s3_client(region):
    Connects to EC2, returns a connection object
        conn = boto3.client('s3', region)

    except Exception as e:
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn

def grantaclBucket(s3_client,sourcebucket,targetbucket):
        acl = s3_client.get_bucket_acl(Bucket = sourcebucket)
        for d in acl['Grants']:
            if 'ID' in d['Grantee']: # If Grantee is NOT URI, then specific Grant needs to be given before enabling Logging
                canonical_id = d['Grantee']['ID']
                response = s3.put_bucket_acl(
                        'Grants': [
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': ''
                                'Permission': 'READ_ACP'
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': ''
                                'Permission': 'WRITE'
                        'Owner': {
                            'ID': canonical_id
            elif 'URI' in d['Grantee']: # If Grant is already given to URL, no need of explicit Grant
                print("Log Delivery Group has the required permission...")
        return True
    except Exception as error:
        return None

def enableAccessLogging(clientS3, sourcebucket, targetbucket,targetPrefix):
        response = clientS3.put_bucket_logging(
                        'LoggingEnabled': {
                            'TargetBucket': targetbucket,
                            'TargetPrefix': targetPrefix 
        return True
    except ClientError as e:
        return None

def showSingleBucket(bucketName,s3,s3bucket,targetPrefix):
  "Displays the contents of a single bucket"
  if ( len(bucketName) == 0 ):
    print ("bucket name not provided, listing all buckets....")
    print ("Bucket Name provided is: %s" % bucketName)
    #s3bucket = boto3.resource('s3')
    my_bucket = s3bucket.Bucket(bucketName)
    bucket_logging = s3bucket.BucketLogging(bucketName)
    bucket_logging_response = bucket_logging.logging_enabled
    if bucket_logging.logging_enabled is None:
        print("Bucket - {} is not loggging Enabled" .format(bucketName))
        print("Bucket - {} logging is in progress..." .format(bucketName))
        grantaclBucket(s3,bucketName,bucketName) # Grant ACL to Log Delivery Group - mandatory before enabling logging
        enableAccessLogging(s3, bucketName, bucketName,targetPrefix) # Enable Bucket Logging
        print("Bucket - {} Logging is already enabled." .format(bucketName))
        print("Target Bucket is - {}" .format(bucket_logging_response['TargetBucket']))
        print("Target prefix is - {}" .format(bucket_logging_response['TargetPrefix']))
    #for object in my_bucket.objects.all():
    #  print(object.key)

def showAllBuckets(s3,s3bucket,targetPrefix):
        response = s3.list_buckets()
        for bucket in response['Buckets']:
            my_bucket = bucket['Name']
            bucket_logging = s3bucket.BucketLogging(my_bucket)
            bucket_logging_response = bucket_logging.logging_enabled
            if bucket_logging.logging_enabled is None:
                print("Bucket - {} is not loggging Enabled" .format(my_bucket))
                print("Bucket - {} logging is in progress..." .format(my_bucket))
                grantaclBucket(s3,my_bucket,my_bucket) # Grant ACL to Log Delivery Group
                enableAccessLogging(s3,my_bucket,my_bucket,targetPrefix) # Enable Bucket Logging
                print("Bucket - {} Logging is already enabled." .format(my_bucket))
                target_bucket = bucket_logging_response['TargetBucket']
                target_prefix = bucket_logging_response['TargetPrefix']
    except ClientError as e:
        print("The bucket does not exist, choose how to deal with it or raise the exception: "+e)

if __name__ == '__main__':
        targetPrefix = 'S3_Access_logs/'
        s3_client_conn = s3_client(REGION)
        s3_resource_conn = s3_resource(REGION)
        print("<font size=1 face=verdana color=blue>Processing for AWS Account :- <b><font size=1 color=red> {}</font></b></font><br>".format(AWS_Account_ID))
            "<font size=1 face=verdana color=blue>==============================</font><br><br>")
        if BUCKET_NAME == "ALL":
    except Exception as error:
        print("Issue while enabling Server Access Logging")

This python script is being called from Shell script – where the environment is set using “AssumeRole” funciton.

Shell Script Name –


if [[ $# -lt 2 ]]; then
  echo "Usage: ${0} <AccountID> <Bucket Name>"
  exit 1
cat /dev/null > ${logfile}

. /u01/app/scripts/bin/ ${AccountID}
# No need to set Region as Buckets are Global
echo "python ${script_top}/bin/ -accountID ${AccountID} -b ${BucketName}"
python ${script_top}/bin/ -accountID ${AccountID} -b ${BucketName}

Hope this helps. Happy reading !!!
~Anand M


2 responses to “Script to Enable AWS S3 Server Access Logging using Boto3”

  1. Hi Anand, great code, really useful, I just wanted to tank you and also mention something I think you could add or fix, on lines 151-152 and 173-174, the origin bucket is sent twice, therefore creating the logs in the same origin bucket, In my experience that creates a logging loop and a huge cost increase, again, thanks for the code, really useful

    1. Thanks Victor for the comment. However I looked into the code – both are separate function calls. One line is granting ACL while the other one is enabling the access log

