Created by Vladimir Nevski 12 07 2017 This script creates table in AWS

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Created by Vladimir Nevski 12.07.2017
# This script creates table in AWS Athena with data from s3, which is generated by firehose process for actions streams
# The process will run every hour and will update Athena metadata to point on last hour directory in s3
# Run example: python3 athena_actions_request_last_day.py
import boto3
import datetime
import time
import logging
# Creating logging file and setting logging configuration
str_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
log_file = '/opt/scripts/moments/logs/athena_actions_request_last_day_' + str_time + '.log'
print (log_file)
log_frmt = '%(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(filename=log_file, level=logging.INFO,format=log_frmt)
logging.info('Process athena_actions_request_last_day.py Started')
try:
# Creating boto3 Athena client
client = boto3.client('athena',region_name='us-east-1')
# Taking date parts to generate path in s3 as it written by Firehose
curr_date = datetime.datetime.now()
yest_date = datetime.datetime.now() - datetime.timedelta(days=1)
str_year_c = curr_date.strftime('%Y')
str_month_c = curr_date.strftime('%m')
str_day_c = curr_date.strftime('%d')
str_year_y = yest_date.strftime('%Y')
str_month_y = yest_date.strftime('%m')
str_day_y = yest_date.strftime('%d')
logging.info('Deleting current table')
stmt_del = """drop table if exists moments.actions_request_last_day"""
# Deleting table if exists
response = client.start_query_execution(
QueryString=stmt_del,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/del_res',
}
)
logging.info('Delete reponse:')
logging.info(response.__str__())
logging.info('Sleeping 5 sec')
# Sleeping before creation, it takes so,e time to Athena to update metadata catalog
time.sleep(5)
# Creating table with data for last hour from s3
stmt = """CREATE EXTERNAL TABLE IF NOT EXISTS moments.actions_request_last_day (
acid string,
aid string,
pid string,
type string,
app_package string,
action_name string ,
ip string,
cuid string,
raw_request string,
country string,
time TIMESTAMP,
region string,
server_ip string
)
partitioned by (day int)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://appnext-firehose/actions-request-fh{0}/{1}/{2}/';""".format(str_year_c,str_month_c,str_day_c)
logging.info(stmt)
response = client.start_query_execution(
QueryString=stmt,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/create_res',
}
)
# Saving API response to log.
logging.info('Create reponse:')
logging.info(response.__str__())
time.sleep(5)
# Adding partitions for today and yesterday
stmt_altr = """ALTER TABLE moments.actions_request_last_day ADD PARTITION (day={0}{1}{2}) LOCATION 's3://appnext-firehose/actions-request-fh{0}/{1}/{2}/' PARTITION (day={3}{4}{5}) LOCATION 's3://appnext-firehose/actions-request-fh{3}/{4}/{5}/'""".format(str_year_c,str_month_c,str_day_c,str_year_y,str_month_y,str_day_y)
logging.info(stmt_altr)
response = client.start_query_execution(
QueryString=stmt_altr,
QueryExecutionContext={
'Database': 'moments'
},
ResultConfiguration={
'OutputLocation': 's3://appnext-firehose/athena_response/create_res',
}
)
# Saving API response to log.
logging.info('Alter reponse:')
logging.info(response.__str__())
logging.info('Process athena_actions_request_last_day.py Finished Successfuly')
except Exception as e:
logging.error("Error in script execution: " + e.__str__())