class DocumentProcessor object WORKER_TASK_SIZE 100 00 SAVER_CHUNK_LIM

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class DocumentProcessor(object):
WORKER_TASK_SIZE = 100#00
SAVER_CHUNK_LIMIT = 1000
WORKER_POOL_SIZE = 10
def __init__(self, result_file):
self.result_file = result_file
self.id_cache = []
self.task_queue = Queue()
self.result_queue = Queue()
self.spawn_workers()
self.documents_loaded = 0
def worker(self, name, task_queue, result_queue):
while True:
task = self.task_queue.get()
if task is None:
break
else:
for doc in db.query.find({'_id': {'$in': task}}, {'q': 1}):
self.result_queue.put(doc['q'])
def saver(self, result_queue, result_file):
chunk = []
while True:
result = self.result_queue.get()
if result is None:
break
else:
self.documents_loaded += 1
chunk.append(result)
if len(chunk) >= self.SAVER_CHUNK_LIMIT:
self.result_file.write('\n'.join(chunk) + '\n')
chunk = []
if chunk:
self.result_file.write('\n'.join(chunk) + '\n')
chunk = []
def spawn_workers(self):
self.worker_pool = []
for x in xrange(self.WORKER_POOL_SIZE):
name = 'worker-%d' % x
thread = Thread(target=self.worker, args=[name, self.task_queue, self.result_queue])
self.worker_pool.append(thread)
thread.start()
self.saver_thread = Thread(target=self.saver, args=[self.result_queue, self.result_file])
self.saver_thread.start()
def add_doc_id(self, id_):
self.id_cache.append(id_)
if len(self.id_cache) >= self.WORKER_TASK_SIZE:
self.task_queue.put(self.id_cache)
self.id_cache = []
def shutdown(self):
if self.id_cache:
self.task_queue.put(self.id_cache)
self.id_cache = []
for x in xrange(len(self.worker_pool)):
self.task_queue.put(None)
for worker in self.worker_pool:
worker.join()
self.result_queue.put(None)
self.saver_thread.join()
self.result_file.close()