#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import logging
import configparser
from cassandra.cluster import Cluster
from cassandra.cqltypes import DateType
log = logging.getLogger()
log.setLevel('INFO')
class SimpleClient:
session = None
prepared_statement_table1 = None
prepared_statement_table2 = None
output_file = None
def connect(self, nodes):
cluster = Cluster(nodes)
metadata = cluster.metadata
self.session = cluster.connect()
log.info('Connected to cluster: ' + metadata.cluster_name)
for host in metadata.all_hosts():
log.info('Datacenter: %s; Host: %s; Rack: %s',
host.datacenter, host.address, host.rack)
def close(self):
self.session.cluster.shutdown()
self.session.shutdown()
log.info('Connection closed.')
def create_schema(self):
self.session.execute("""CREATE KEYSPACE count_calls WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};""")
self.session.execute("""
CREATE TABLE count_calls.calls_table (
num1 text PRIMARY KEY,
calls map<timestamp, text>
);
""")
self.session.execute("""
CREATE TABLE count_calls.calls_table2 (
num1 text,
started timestamp,
num2 text,
PRIMARY KEY (num1, started)
);
""")
log.info('Count_calls keyspace and schema created.')
def count_calls(self, numbers, start_date, end_date, criteria):
print("begin")
prepared_statement = self.session.prepare("SELECT calls FROM count_calls.calls_table WHERE num1 = ?;")
time_begin = time.time()
for number in numbers:
if len(str(number)) < 3:
continue
number_of_calls = 0
result = {}
#query = "SELECT calls FROM count_calls.calls_table WHERE num1 = '"+str(number)+"';"
for line in self.session.execute(prepared_statement, [str(number)]):
for date, num2 in line.calls.items():
if date < start_date or len(str(num2)) < 3:
continue
elif date <= end_date:
number_of_calls += 1
if num2 in result:
result[num2] += 1
else:
result[num2] = 1
print(result.items())
needed = number_of_calls*criteria/100.
for_count = 100./number_of_calls
output_file = open('/home/natalia/Рабочий стол/cassandraT/output.txt', 'w')
for key in result:
if result[key] > needed:
r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key])\
+ '\t' + str(result[key]*for_count)
output_file.write(r)
print(r)
time_end = time.time()
print ("work time = " + str(time_end - time_begin))
return
"""def count_calls2(self, numbers, start_date, end_date, criteria):
print("begin2")
prepared_statement = self.session.prepare("SELECT started, num2 FROM count_calls.calls_table2 WHERE num1=?;")
for number in numbers:
if len(str(number)) < 3:
continue
number_of_calls = 0
result = {}
#query = "SELECT started, num2 FROM count_calls.calls_table2 WHERE num1 = '"+str(number)+"';"
for line in self.session.execute(prepared_statement, [str(number)]):
if len(str(line.num2)) < 3 or line.started < start_date:
continue
elif line.started <= end_date :
number_of_calls += 1
if line.num2 in result:
result[line.num2] += 1
else:
result[line.num2] = 1
print(result.items())
needed = number_of_calls*criteria/100.
for_count = 100./number_of_calls
output_file = open('/home/natalia/Рабочий стол/cassandraT/output2.txt', 'w')
for key in result:
if result[key] > needed:
r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key]) + '\t' + str(result[key]*for_count)
print(r)
output_file.write(r)
return
"""
def count_calls2(self, numbers, start_date, end_date, criteria):
print("begin2")
prepared_statement = self.session.prepare("SELECT started, num2 FROM count_calls.calls_table2 WHERE\
num1=? AND started >= ? AND started <= ?;")
time_begin = time.time()
for number in numbers:
if len(str(number)) < 3:
continue
result = {}
#query = "SELECT num2 FROM count_calls.calls_table2 WHERE num1 = '"+str(number)+"' \
#AND started >= '"+str(start_date)+"' AND started <= '"+str(end_date)+"';"
e = self.session.execute(prepared_statement, [str(number), start_date, end_date])
number_of_calls = len(e)
for line in e:
if len(line.num2) > 3:
if line.num2 in result:
result[line.num2] += 1
else:
result[line.num2] = 1
print(result.items())
needed = number_of_calls*criteria/100.
for_count = 100./number_of_calls
output_file = open('/home/natalia/Рабочий стол/cassandraT/output2.txt', 'w')
for key in result:
if result[key] > needed:
r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key])\
+ '\t' + str(result[key]*for_count)
print(r)
output_file.write(r)
time_end = time.time()
print ("work time = " + str(time_end - time_begin))
return
prepared_insert_calls_table2 = "INSERT INTO count_calls.calls_table2 (num1, started, num2) VALUES (? ,? ,?);"
prepared_insert_calls_table = "INSERT INTO count_calls.calls_table (num1, calls) VALUES (?, ?);"
prepared_update_calls_table = ""
#copy from calls_table to calls_table2
def insert_double(self, num1, started, num2):
query = "INSERT INTO count_calls.calls_table2 (num1, started, num2) VALUES ('"+str(num1) +\
"','"+str(started)+"','"+str(num2)+"');"
self.session.execute(query)
def update(self, num1, started, num2):
query = "UPDATE count_calls.calls_table SET calls = calls + {'"+str(started)+"' : '"+str(num2)+\
"'} WHERE num1 = '"+str(num1)+"';"
self.session.execute(query)
def insert(self, num1, started, num2):
query = "INSERT INTO count_calls.calls_table (num1, calls) VALUES ('"+str(num1)+"',{ '"+\
str(started)+"' : '"+str(num2)+"' } );"
self.session.execute(query)
def insert_from_calls_to_calls2(self):
added_nums = []
for line in self.session.execute("SELECT * FROM count_calls.calls_table2;"):
self.insert_double(line.num2, line.started, line.num1)
if added_nums.__contains__(line.num1):
self.update(line.num1, line.started, line.num2)
else:
self.insert(line.num1, line.started, line.num2)
added_nums.append(line.num1)
if added_nums.__contains__(line.num2):
self.update(line.num2, line.started, line.num1)
else:
self.insert(line.num2, line.started, line.num1)
added_nums += line.num2
def convert_date(str_date):
d = str_date.replace('.',' ').replace(':', ' ').replace('-', ' ').split(' ')
#print("d.len="+ str(len(d)))
f = ''
if len(d) == 5:
f = '%d.%m.%Y %H:%M'
elif len(d) == 6:
f = '%d.%m.%Y %H:%M:%S'
return datetime.strptime(str_date, f)
def format_date_for_import(date):
whole_date = date.split(" ")
d = whole_date[0].split(".")
return d[2]+"-"+d[1]+"-"+d[0]+" "+whole_date[1]
#before COPY count_calls.calls_table2 (num1, started, num2) FROM 'output_csv';
def format_csv_data_for_import(source, output_csv):
#needed: 79147349111,"{'2013-03-02 08:21:11+0400': '79140656269'}"
#needed: 79147349111,2013-03-02 08:21:11+0400,"79140656269"
data = open(source)
op = open(output_csv, 'w')
i = 0
for line in data:
if i < 1000:
i += 1
el = line.split(";")
if len(el) > 2 and len(el[2]) > 2:
date = format_date_for_import(el[1])
#print(date)
#string_to_write = el[0] + ",\"{'" + date + "': '" + el[2].split("\r\n")[0] + "'}\""
op.write(el[0] + ","+date + ",\"" + el[2].split("\r\n")[0] + "\"" + "\n")
op.write(el[2].split("\r\n")[0] + ","+date + ",\"" + el[0] + "\"" + "\n")
else:
break
def main():
conf = configparser.ConfigParser()
conf.read('/home/natalia/Рабочий стол/cassandraT/config.ini')
dc = open(conf['INPUT']['input_date_and_criteria']).read().split("\n")
criteria = float(dc[0])
start_date = convert_date(dc[1])
end_date = convert_date(dc[2])
numbers = open(conf['INPUT']['input_numbers']).read().split("\n")
print(str(criteria) + "\n" + str(start_date) + "\n" + str(end_date))
print(numbers)
#criteria = 10.
#start_date = convert_date('02.03.2013 3:17:10')
#end_date = convert_date('02.03.2013 8:17:20')
#numbers = ['79532196613']
logging.basicConfig()
client = SimpleClient()
client.connect(['127.0.0.1'])
#client.create_schema()
#time.sleep(10)
#format_csv_data_for_import('/home/natalia/Рабочий стол/cassandraT/all_calls.txt','/home/natalia/Рабочий стол/cassandraT/trytemp.csv')
#client.insert_from_calls_to_calls2()
client.count_calls(numbers, start_date, end_date, criteria)
client.count_calls2(numbers, start_date, end_date, criteria)
client.close()
if __name__ == "__main__":
main()