# Created by Vladimir Nevski 12.07.2017
# This script creates table in AWS Athena with data from s3, which is generated by firehose process for actions streams
# The process will run every hour and will update Athena metadata to point on last hour directory in s3
# Run example: python3 athena_actions_request_last_day.py
import boto3
import datetime
import time
import logging
# Creating logging file and setting logging configuration
str_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
log_file = '/opt/scripts/moments/logs/athena_actions_request_last_day_' + str_time + '.log'
print (log_file)
log_frmt = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(filename=log_file, level=logging.INFO,format=log_frmt)
logging.info('Process athena_actions_request_last_day.py Started')
try:
# Creating boto3 Athena client
client = boto3.client('athena',region_name='us-east-1')
# Taking date parts to generate path in s3 as it written by Firehose
curr_date = datetime.datetime.now()
yest_date = datetime.datetime.now() - datetime.timedelta(days=1)
str_year_c = curr_date.strftime('%Y')
str_month_c = curr_date.strftime('%m')
str_day_c = curr_date.strftime('%d')
str_year_y = yest_date.strftime('%Y')
str_month_y = yest_date.strftime('%m')
str_day_y = yest_date.strftime('%d')
logging.info('Deleting current table')
stmt_del = """drop table if exists moments.actions_request_last_day"""
# Deleting table if exists
response = client.start_query_execution(
QueryString=stmt_del,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/del_res',
}
)
logging.info('Delete reponse:')
logging.info(response.__str__())
logging.info('Sleeping 5 sec')
# Sleeping before creation, it takes so,e time to Athena to update metadata catalog
time.sleep(5)
# Creating table with data for last hour from s3
stmt = """CREATE EXTERNAL TABLE IF NOT EXISTS moments.actions_request_last_day (
acid string,
aid string,
pid string,
type string,
app_package string,
action_name string ,
ip string,
cuid string,
raw_request string,
country string,
time TIMESTAMP,
region string,
server_ip string
)
partitioned by (day int)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://appnext-firehose/actions-request-fh{0}/{1}/{2}/';""".format(str_year_c,str_month_c,str_day_c)
logging.info(stmt)
response = client.start_query_execution(
QueryString=stmt,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/create_res',
}
)
# Saving API response to log.
logging.info('Create reponse:')
logging.info(response.__str__())
time.sleep(5)
# Adding partitions for today and yesterday
stmt_altr = """ALTER TABLE moments.actions_request_last_day ADD PARTITION (day={0}{1}{2}) LOCATION 's3://appnext-firehose/actions-request-fh{0}/{1}/{2}/' PARTITION (day={3}{4}{5}) LOCATION 's3://appnext-firehose/actions-request-fh{3}/{4}/{5}/'""".format(str_year_c,str_month_c,str_day_c,str_year_y,str_month_y,str_day_y)
logging.info(stmt_altr)
response = client.start_query_execution(
QueryString=stmt_altr,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/create_res',
}
)
# Saving API response to log.
logging.info('Alter reponse:')
logging.info(response.__str__())
logging.info('Process athena_actions_request_last_day.py Finished Successfuly')
except Exception as e:
logging.error("Error in script execution: " + e.__str__())