#!/usr/bin/env python # -*- coding: utf-8 -*- from datetime import datetime import time import logging import configparser from cassandra.cluster import Cluster from cassandra.cqltypes import DateType log = logging.getLogger() log.setLevel('INFO') class SimpleClient: session = None prepared_statement_table1 = None prepared_statement_table2 = None output_file = None def connect(self, nodes): cluster = Cluster(nodes) metadata = cluster.metadata self.session = cluster.connect() log.info('Connected to cluster: ' + metadata.cluster_name) for host in metadata.all_hosts(): log.info('Datacenter: %s; Host: %s; Rack: %s', host.datacenter, host.address, host.rack) def close(self): self.session.cluster.shutdown() self.session.shutdown() log.info('Connection closed.') def create_schema(self): self.session.execute("""CREATE KEYSPACE count_calls WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};""") self.session.execute(""" CREATE TABLE count_calls.calls_table ( num1 text PRIMARY KEY, calls map ); """) self.session.execute(""" CREATE TABLE count_calls.calls_table2 ( num1 text, started timestamp, num2 text, PRIMARY KEY (num1, started) ); """) log.info('Count_calls keyspace and schema created.') def count_calls(self, numbers, start_date, end_date, criteria): print("begin") prepared_statement = self.session.prepare("SELECT calls FROM count_calls.calls_table WHERE num1 = ?;") time_begin = time.time() for number in numbers: if len(str(number)) < 3: continue number_of_calls = 0 result = {} #query = "SELECT calls FROM count_calls.calls_table WHERE num1 = '"+str(number)+"';" for line in self.session.execute(prepared_statement, [str(number)]): for date, num2 in line.calls.items(): if date < start_date or len(str(num2)) < 3: continue elif date <= end_date: number_of_calls += 1 if num2 in result: result[num2] += 1 else: result[num2] = 1 print(result.items()) needed = number_of_calls*criteria/100. for_count = 100./number_of_calls output_file = open('/home/natalia/Рабочий стол/cassandraT/output.txt', 'w') for key in result: if result[key] > needed: r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key])\ + '\t' + str(result[key]*for_count) output_file.write(r) print(r) time_end = time.time() print ("work time = " + str(time_end - time_begin)) return """def count_calls2(self, numbers, start_date, end_date, criteria): print("begin2") prepared_statement = self.session.prepare("SELECT started, num2 FROM count_calls.calls_table2 WHERE num1=?;") for number in numbers: if len(str(number)) < 3: continue number_of_calls = 0 result = {} #query = "SELECT started, num2 FROM count_calls.calls_table2 WHERE num1 = '"+str(number)+"';" for line in self.session.execute(prepared_statement, [str(number)]): if len(str(line.num2)) < 3 or line.started < start_date: continue elif line.started <= end_date : number_of_calls += 1 if line.num2 in result: result[line.num2] += 1 else: result[line.num2] = 1 print(result.items()) needed = number_of_calls*criteria/100. for_count = 100./number_of_calls output_file = open('/home/natalia/Рабочий стол/cassandraT/output2.txt', 'w') for key in result: if result[key] > needed: r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key]) + '\t' + str(result[key]*for_count) print(r) output_file.write(r) return """ def count_calls2(self, numbers, start_date, end_date, criteria): print("begin2") prepared_statement = self.session.prepare("SELECT started, num2 FROM count_calls.calls_table2 WHERE\ num1=? AND started >= ? AND started <= ?;") time_begin = time.time() for number in numbers: if len(str(number)) < 3: continue result = {} #query = "SELECT num2 FROM count_calls.calls_table2 WHERE num1 = '"+str(number)+"' \ #AND started >= '"+str(start_date)+"' AND started <= '"+str(end_date)+"';" e = self.session.execute(prepared_statement, [str(number), start_date, end_date]) number_of_calls = len(e) for line in e: if len(line.num2) > 3: if line.num2 in result: result[line.num2] += 1 else: result[line.num2] = 1 print(result.items()) needed = number_of_calls*criteria/100. for_count = 100./number_of_calls output_file = open('/home/natalia/Рабочий стол/cassandraT/output2.txt', 'w') for key in result: if result[key] > needed: r = 'r: '+str(number) + '\t' + str(key).split("\r\n")[0] + '\t' + str(result[key])\ + '\t' + str(result[key]*for_count) print(r) output_file.write(r) time_end = time.time() print ("work time = " + str(time_end - time_begin)) return prepared_insert_calls_table2 = "INSERT INTO count_calls.calls_table2 (num1, started, num2) VALUES (? ,? ,?);" prepared_insert_calls_table = "INSERT INTO count_calls.calls_table (num1, calls) VALUES (?, ?);" prepared_update_calls_table = "" #copy from calls_table to calls_table2 def insert_double(self, num1, started, num2): query = "INSERT INTO count_calls.calls_table2 (num1, started, num2) VALUES ('"+str(num1) +\ "','"+str(started)+"','"+str(num2)+"');" self.session.execute(query) def update(self, num1, started, num2): query = "UPDATE count_calls.calls_table SET calls = calls + {'"+str(started)+"' : '"+str(num2)+\ "'} WHERE num1 = '"+str(num1)+"';" self.session.execute(query) def insert(self, num1, started, num2): query = "INSERT INTO count_calls.calls_table (num1, calls) VALUES ('"+str(num1)+"',{ '"+\ str(started)+"' : '"+str(num2)+"' } );" self.session.execute(query) def insert_from_calls_to_calls2(self): added_nums = [] for line in self.session.execute("SELECT * FROM count_calls.calls_table2;"): self.insert_double(line.num2, line.started, line.num1) if added_nums.__contains__(line.num1): self.update(line.num1, line.started, line.num2) else: self.insert(line.num1, line.started, line.num2) added_nums.append(line.num1) if added_nums.__contains__(line.num2): self.update(line.num2, line.started, line.num1) else: self.insert(line.num2, line.started, line.num1) added_nums += line.num2 def convert_date(str_date): d = str_date.replace('.',' ').replace(':', ' ').replace('-', ' ').split(' ') #print("d.len="+ str(len(d))) f = '' if len(d) == 5: f = '%d.%m.%Y %H:%M' elif len(d) == 6: f = '%d.%m.%Y %H:%M:%S' return datetime.strptime(str_date, f) def format_date_for_import(date): whole_date = date.split(" ") d = whole_date[0].split(".") return d[2]+"-"+d[1]+"-"+d[0]+" "+whole_date[1] #before COPY count_calls.calls_table2 (num1, started, num2) FROM 'output_csv'; def format_csv_data_for_import(source, output_csv): #needed: 79147349111,"{'2013-03-02 08:21:11+0400': '79140656269'}" #needed: 79147349111,2013-03-02 08:21:11+0400,"79140656269" data = open(source) op = open(output_csv, 'w') i = 0 for line in data: if i < 1000: i += 1 el = line.split(";") if len(el) > 2 and len(el[2]) > 2: date = format_date_for_import(el[1]) #print(date) #string_to_write = el[0] + ",\"{'" + date + "': '" + el[2].split("\r\n")[0] + "'}\"" op.write(el[0] + ","+date + ",\"" + el[2].split("\r\n")[0] + "\"" + "\n") op.write(el[2].split("\r\n")[0] + ","+date + ",\"" + el[0] + "\"" + "\n") else: break def main(): conf = configparser.ConfigParser() conf.read('/home/natalia/Рабочий стол/cassandraT/config.ini') dc = open(conf['INPUT']['input_date_and_criteria']).read().split("\n") criteria = float(dc[0]) start_date = convert_date(dc[1]) end_date = convert_date(dc[2]) numbers = open(conf['INPUT']['input_numbers']).read().split("\n") print(str(criteria) + "\n" + str(start_date) + "\n" + str(end_date)) print(numbers) #criteria = 10. #start_date = convert_date('02.03.2013 3:17:10') #end_date = convert_date('02.03.2013 8:17:20') #numbers = ['79532196613'] logging.basicConfig() client = SimpleClient() client.connect(['127.0.0.1']) #client.create_schema() #time.sleep(10) #format_csv_data_for_import('/home/natalia/Рабочий стол/cassandraT/all_calls.txt','/home/natalia/Рабочий стол/cassandraT/trytemp.csv') #client.insert_from_calls_to_calls2() client.count_calls(numbers, start_date, end_date, criteria) client.count_calls2(numbers, start_date, end_date, criteria) client.close() if __name__ == "__main__": main()