Home > AWS/Boto3/Python > Script to Enable AWS S3 Server Access Logging using Boto3

Script to Enable AWS S3 Server Access Logging using Boto3

Many times we come across a situation where S3 Bucket access logging is not default and due to corporate security policy, such buckets are flagged a Security incident. Hence there was a need to enable the sever access logging programmatically due to very large number of such S3 Buckets.
Recently I developed a script using boto3 to achieve the task. This helped to enable the logging for 100+ such buckets in ~30 min. Also, I configured a job in Jenkins so that job can be accomplished by L1 support team.

Script Name – EnableS3BucketLogging.py

#!/usr/bin/env python

import boto3
import time
import sys
import logging
import datetime
import argparse
import csv
import os
from botocore.exceptions import ClientError

print ("S3 Listing at %s" % time.ctime())



DEFAULT_BUCKET = "ALL"
DEFAULT_REGION = "us-east-1"
DEFAULT_AWS_Account_ID = "1234567899765"
DEFAULT_AWS_Account_Name = "Dummy Account Name"



def parse_commandline_arguments():

    global REGION
    global AWS_Account_ID
    global AWS_Account_Name
    global BUCKET_NAME
    global target_bucket

    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     description='Enable S3 Server Logging if Not enabled.')
    parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID,
                        help="The AWS Account ID where volume tagging is  to be done")
    parser.add_argument("-r", "--region", dest="region", type=str,
                        default=DEFAULT_REGION, help="Specify the region of the AWS Account")
    parser.add_argument("-b", "--bucket", dest="bucket", type=str,
                        default=DEFAULT_BUCKET, help="Specify the bucket name")
    parser.add_argument("-accountName","--AWSAccountName",dest="aws_account_name",type=str, default=DEFAULT_AWS_Account_Name,
                        help="Specify the AWS Account Name")

    args = parser.parse_args()
    REGION = args.region
    AWS_Account_ID = args.aws_ID
    BUCKET_NAME = args.bucket
    AWS_Account_Name = args.aws_account_name


def s3_resource(region):

    # Connects to EC2, returns a connection object
    try:
        conn = boto3.resource('s3', region_name=region)

    except Exception as e:
        sys.stderr.write(
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn


def s3_client(region):
    """
    Connects to EC2, returns a connection object
    """
    try:
        conn = boto3.client('s3', region)

    except Exception as e:
        sys.stderr.write(
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn



def grantaclBucket(s3_client,sourcebucket,targetbucket):
    try:
        acl = s3_client.get_bucket_acl(Bucket = sourcebucket)
        for d in acl['Grants']:
            if 'ID' in d['Grantee']: # If Grantee is NOT URI, then specific Grant needs to be given before enabling Logging
                canonical_id = d['Grantee']['ID']
                response = s3.put_bucket_acl(
                    AccessControlPolicy={
                        'Grants': [
                            {
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
                                },
                                'Permission': 'READ_ACP'
                            },
                            {
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
                                },
                                'Permission': 'WRITE'
                            }
                        ],
                        'Owner': {
                            'ID': canonical_id
                            },
                        },
                        Bucket=targetbucket
                    )
            elif 'URI' in d['Grantee']: # If Grant is already given to URL, no need of explicit Grant
                print("Log Delivery Group has the required permission...")
        return True
    except Exception as error:
        logging.error(e)
        return None
        



def enableAccessLogging(clientS3, sourcebucket, targetbucket,targetPrefix):
    try:
        response = clientS3.put_bucket_logging(
                    Bucket=sourcebucket,
                    BucketLoggingStatus={
                        'LoggingEnabled': {
                            'TargetBucket': targetbucket,
                            'TargetPrefix': targetPrefix 
                            }
                        },
                    )
        return True
    except ClientError as e:
        logging.error(e)
        return None


def showSingleBucket(bucketName,s3,s3bucket,targetPrefix):
  "Displays the contents of a single bucket"
  if ( len(bucketName) == 0 ):
    print ("bucket name not provided, listing all buckets....")
    time.sleep(8)
  else:
    print ("Bucket Name provided is: %s" % bucketName)
    #s3bucket = boto3.resource('s3')
    my_bucket = s3bucket.Bucket(bucketName)
    bucket_logging = s3bucket.BucketLogging(bucketName)
    bucket_logging_response = bucket_logging.logging_enabled
    if bucket_logging.logging_enabled is None:
        print("Bucket - {} is not loggging Enabled" .format(bucketName))
        print("Bucket - {} logging is in progress..." .format(bucketName))
        grantaclBucket(s3,bucketName,bucketName) # Grant ACL to Log Delivery Group - mandatory before enabling logging
        enableAccessLogging(s3, bucketName, bucketName,targetPrefix) # Enable Bucket Logging
    else:
        print("Bucket - {} Logging is already enabled." .format(bucketName))
        print("Target Bucket is - {}" .format(bucket_logging_response['TargetBucket']))
        print("Target prefix is - {}" .format(bucket_logging_response['TargetPrefix']))
    #for object in my_bucket.objects.all():
    #  print(object.key)
  return



def showAllBuckets(s3,s3bucket,targetPrefix):
    try:
        response = s3.list_buckets()
        for bucket in response['Buckets']:
            my_bucket = bucket['Name']
            bucket_logging = s3bucket.BucketLogging(my_bucket)
            bucket_logging_response = bucket_logging.logging_enabled
            if bucket_logging.logging_enabled is None:
                print("Bucket - {} is not loggging Enabled" .format(my_bucket))
                print("Bucket - {} logging is in progress..." .format(my_bucket))
                grantaclBucket(s3,my_bucket,my_bucket) # Grant ACL to Log Delivery Group
                enableAccessLogging(s3,my_bucket,my_bucket,targetPrefix) # Enable Bucket Logging
            else:
                print("Bucket - {} Logging is already enabled." .format(my_bucket))
                target_bucket = bucket_logging_response['TargetBucket']
                target_prefix = bucket_logging_response['TargetPrefix']
    except ClientError as e:
        print("The bucket does not exist, choose how to deal with it or raise the exception: "+e)
    return



if __name__ == '__main__':
    try:
        parse_commandline_arguments()
        targetPrefix = 'S3_Access_logs/'
        s3_client_conn = s3_client(REGION)
        s3_resource_conn = s3_resource(REGION)
        print("<font size=1 face=verdana color=blue>Processing for AWS Account :- <b><font size=1 color=red> {}</font></b></font><br>".format(AWS_Account_ID))
        print(
            "<font size=1 face=verdana color=blue>==============================</font><br><br>")
        if BUCKET_NAME == "ALL":
            showAllBuckets(s3_client_conn,s3_resource_conn,targetPrefix)
        else:
            showSingleBucket(BUCKET_NAME,s3_client_conn,s3_resource_conn,targetPrefix)
    except Exception as error:
        logging.error(e)
        print(str(error))
        print("Issue while enabling Server Access Logging")

This python script is being called from Shell script – where the environment is set using “AssumeRole” funciton.

Shell Script Name – EnableS3BucketLogging.py

#!/bin/sh

if [[ $# -lt 2 ]]; then
  echo "Usage: ${0} <AccountID> <Bucket Name>"
  exit 1
fi
AccountID=${1}
BucketName=${2} 
script_top=/u01/app/scripts
outputdir=${script_top}/output
logfile=${script_top}/logs/EnableS3BucketLogging.log
cat /dev/null > ${logfile}


unset AWS_SESSION_TOKEN AWS_DEFAULT_REGION AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID
. /u01/app/scripts/bin/AssumeRole.sh ${AccountID}
# No need to set Region as Buckets are Global
echo "python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}"
python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}

Hope this helps. Happy reading !!!
~Anand M

Advertisement
Categories: AWS/Boto3/Python Tags:
  1. Victor Perez
    September 29, 2020 at 8:57 am

    Hi Anand, great code, really useful, I just wanted to tank you and also mention something I think you could add or fix, on lines 151-152 and 173-174, the origin bucket is sent twice, therefore creating the logs in the same origin bucket, In my experience that creates a logging loop and a huge cost increase, again, thanks for the code, really useful

    • March 14, 2021 at 11:49 pm

      Thanks Victor for the comment. However I looked into the code – both are separate function calls. One line is granting ACL while the other one is enabling the access log

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: