import threading import time from Queue import Queue Empty as QueueEmp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
import time
from Queue import Queue, Empty as QueueEmpty
def make_work(callback, tasks, limit, results=None, return_results=True):
"""
Run up to limit threads, do tasks.
Put results to results queue if it is specified.
"""
results_queue = Queue()
pool = []
if not isinstance(tasks, Queue):
new_tasks = Queue()
for x in tasks:
new_tasks.put(x)
tasks = new_tasks
# If callback result should be placed to the results queue
# wrap the callback to the function that will catch result and
# put it to the queue
if results or return_results:
old_callback = callback
def callback(*args):
res = old_callback(*args)
if return_results:
results_queue.put(res)
if results:
results.put(res)
while True:
pool = filter(lambda x: x.isAlive(), pool)
for x in xrange(limit - len(pool)):
try:
task = tasks.get(False)
except QueueEmpty:
pass
else:
if not isinstance(task, (list, tuple)):
task = [task]
t = threading.Thread(target=callback, args=task)
t.start()
pool.append(t)
if not pool:
break
time.sleep(0.01)
if return_results:
items = []
while not results_queue.empty():
items.append(results_queue.get())
return items