My Journey to the cloud…

In pursuit of excellence….


Script to Enable AWS S3 Server Access Logging using Boto3

Many times we come across a situation where S3 Bucket access logging is not default and due to corporate security policy, such buckets are flagged a Security incident. Hence there was a need to enable the sever access logging programmatically due to very large number of such S3 Buckets.
Recently I developed a script using boto3 to achieve the task. This helped to enable the logging for 100+ such buckets in ~30 min. Also, I configured a job in Jenkins so that job can be accomplished by L1 support team.

Script Name – EnableS3BucketLogging.py

#!/usr/bin/env python

import boto3
import time
import sys
import logging
import datetime
import argparse
import csv
import os
from botocore.exceptions import ClientError

print ("S3 Listing at %s" % time.ctime())



DEFAULT_BUCKET = "ALL"
DEFAULT_REGION = "us-east-1"
DEFAULT_AWS_Account_ID = "1234567899765"
DEFAULT_AWS_Account_Name = "Dummy Account Name"



def parse_commandline_arguments():

    global REGION
    global AWS_Account_ID
    global AWS_Account_Name
    global BUCKET_NAME
    global target_bucket

    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     description='Enable S3 Server Logging if Not enabled.')
    parser.add_argument("-accountID", "--ownerID", dest="aws_ID", type=str, default=DEFAULT_AWS_Account_ID,
                        help="The AWS Account ID where volume tagging is  to be done")
    parser.add_argument("-r", "--region", dest="region", type=str,
                        default=DEFAULT_REGION, help="Specify the region of the AWS Account")
    parser.add_argument("-b", "--bucket", dest="bucket", type=str,
                        default=DEFAULT_BUCKET, help="Specify the bucket name")
    parser.add_argument("-accountName","--AWSAccountName",dest="aws_account_name",type=str, default=DEFAULT_AWS_Account_Name,
                        help="Specify the AWS Account Name")

    args = parser.parse_args()
    REGION = args.region
    AWS_Account_ID = args.aws_ID
    BUCKET_NAME = args.bucket
    AWS_Account_Name = args.aws_account_name


def s3_resource(region):

    # Connects to EC2, returns a connection object
    try:
        conn = boto3.resource('s3', region_name=region)

    except Exception as e:
        sys.stderr.write(
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn


def s3_client(region):
    """
    Connects to EC2, returns a connection object
    """
    try:
        conn = boto3.client('s3', region)

    except Exception as e:
        sys.stderr.write(
            'Could not connect to region: %s. Exception: %s\n' % (region, e))
        conn = None

    return conn



def grantaclBucket(s3_client,sourcebucket,targetbucket):
    try:
        acl = s3_client.get_bucket_acl(Bucket = sourcebucket)
        for d in acl['Grants']:
            if 'ID' in d['Grantee']: # If Grantee is NOT URI, then specific Grant needs to be given before enabling Logging
                canonical_id = d['Grantee']['ID']
                response = s3.put_bucket_acl(
                    AccessControlPolicy={
                        'Grants': [
                            {
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
                                },
                                'Permission': 'READ_ACP'
                            },
                            {
                                'Grantee': {
                                    'Type': 'Group',
                                    'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
                                },
                                'Permission': 'WRITE'
                            }
                        ],
                        'Owner': {
                            'ID': canonical_id
                            },
                        },
                        Bucket=targetbucket
                    )
            elif 'URI' in d['Grantee']: # If Grant is already given to URL, no need of explicit Grant
                print("Log Delivery Group has the required permission...")
        return True
    except Exception as error:
        logging.error(e)
        return None
        



def enableAccessLogging(clientS3, sourcebucket, targetbucket,targetPrefix):
    try:
        response = clientS3.put_bucket_logging(
                    Bucket=sourcebucket,
                    BucketLoggingStatus={
                        'LoggingEnabled': {
                            'TargetBucket': targetbucket,
                            'TargetPrefix': targetPrefix 
                            }
                        },
                    )
        return True
    except ClientError as e:
        logging.error(e)
        return None


def showSingleBucket(bucketName,s3,s3bucket,targetPrefix):
  "Displays the contents of a single bucket"
  if ( len(bucketName) == 0 ):
    print ("bucket name not provided, listing all buckets....")
    time.sleep(8)
  else:
    print ("Bucket Name provided is: %s" % bucketName)
    #s3bucket = boto3.resource('s3')
    my_bucket = s3bucket.Bucket(bucketName)
    bucket_logging = s3bucket.BucketLogging(bucketName)
    bucket_logging_response = bucket_logging.logging_enabled
    if bucket_logging.logging_enabled is None:
        print("Bucket - {} is not loggging Enabled" .format(bucketName))
        print("Bucket - {} logging is in progress..." .format(bucketName))
        grantaclBucket(s3,bucketName,bucketName) # Grant ACL to Log Delivery Group - mandatory before enabling logging
        enableAccessLogging(s3, bucketName, bucketName,targetPrefix) # Enable Bucket Logging
    else:
        print("Bucket - {} Logging is already enabled." .format(bucketName))
        print("Target Bucket is - {}" .format(bucket_logging_response['TargetBucket']))
        print("Target prefix is - {}" .format(bucket_logging_response['TargetPrefix']))
    #for object in my_bucket.objects.all():
    #  print(object.key)
  return



def showAllBuckets(s3,s3bucket,targetPrefix):
    try:
        response = s3.list_buckets()
        for bucket in response['Buckets']:
            my_bucket = bucket['Name']
            bucket_logging = s3bucket.BucketLogging(my_bucket)
            bucket_logging_response = bucket_logging.logging_enabled
            if bucket_logging.logging_enabled is None:
                print("Bucket - {} is not loggging Enabled" .format(my_bucket))
                print("Bucket - {} logging is in progress..." .format(my_bucket))
                grantaclBucket(s3,my_bucket,my_bucket) # Grant ACL to Log Delivery Group
                enableAccessLogging(s3,my_bucket,my_bucket,targetPrefix) # Enable Bucket Logging
            else:
                print("Bucket - {} Logging is already enabled." .format(my_bucket))
                target_bucket = bucket_logging_response['TargetBucket']
                target_prefix = bucket_logging_response['TargetPrefix']
    except ClientError as e:
        print("The bucket does not exist, choose how to deal with it or raise the exception: "+e)
    return



if __name__ == '__main__':
    try:
        parse_commandline_arguments()
        targetPrefix = 'S3_Access_logs/'
        s3_client_conn = s3_client(REGION)
        s3_resource_conn = s3_resource(REGION)
        print("<font size=1 face=verdana color=blue>Processing for AWS Account :- <b><font size=1 color=red> {}</font></b></font><br>".format(AWS_Account_ID))
        print(
            "<font size=1 face=verdana color=blue>==============================</font><br><br>")
        if BUCKET_NAME == "ALL":
            showAllBuckets(s3_client_conn,s3_resource_conn,targetPrefix)
        else:
            showSingleBucket(BUCKET_NAME,s3_client_conn,s3_resource_conn,targetPrefix)
    except Exception as error:
        logging.error(e)
        print(str(error))
        print("Issue while enabling Server Access Logging")

This python script is being called from Shell script – where the environment is set using “AssumeRole” funciton.

Shell Script Name – EnableS3BucketLogging.py

#!/bin/sh

if [[ $# -lt 2 ]]; then
  echo "Usage: ${0} <AccountID> <Bucket Name>"
  exit 1
fi
AccountID=${1}
BucketName=${2} 
script_top=/u01/app/scripts
outputdir=${script_top}/output
logfile=${script_top}/logs/EnableS3BucketLogging.log
cat /dev/null > ${logfile}


unset AWS_SESSION_TOKEN AWS_DEFAULT_REGION AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID
. /u01/app/scripts/bin/AssumeRole.sh ${AccountID}
# No need to set Region as Buckets are Global
echo "python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}"
python ${script_top}/bin/EnableS3BucketLogging.py -accountID ${AccountID} -b ${BucketName}

Hope this helps. Happy reading !!!
~Anand M

Advertisement


2 responses to “Script to Enable AWS S3 Server Access Logging using Boto3”

  1. Hi Anand, great code, really useful, I just wanted to tank you and also mention something I think you could add or fix, on lines 151-152 and 173-174, the origin bucket is sent twice, therefore creating the logs in the same origin bucket, In my experience that creates a logging loop and a huge cost increase, again, thanks for the code, really useful

    1. Thanks Victor for the comment. However I looked into the code – both are separate function calls. One line is granting ACL while the other one is enabling the access log

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

About Me

I’m a Hands-On Technical & Entrprise Solutions Architect based out of Houston, TX. I have been working on Oracle ERP, Oracle Database and Cloud technologies for over 20 years and still going strong for learning new things.

You can connect me on Linkedin and also reach out to me

I am certified for 8x AWS, OCP (Oracle Certified Professionals), PMP, ITTL and 6 Sigma.

Disclaimer

This is a personal blog. Any views or opinions represented in this blog are personal and belong solely to the blog owner and do not represent those of people, institutions or organizations that the owner may or may not be associated with in professional or personal capacity, unless explicitly stated.
All content provided on this blog is for informational purposes only. The owner of this blog makes no representations as to the accuracy or completeness of any information on this site or found by following any link on this site.

The owner will not be liable for any errors or omissions in this information nor for the availability of this information. The owner will not be liable for any losses, injuries, or damages from the display or use of this information. Any script available on the blog post MUST be tested before they are run against Production environment.

Newsletter

%d bloggers like this: