coding utf-8 Потоки for result in run callback функция tasks аргументы

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
'''
Потоки
for result in run(callback, # функция
tasks, # аргументы для заданий, любой итерируемый тип
thread_limit # количество потоков,
queue_limit=None # максимальный размер очереди заданий
sleep=0.01 # время сна в основном цикле (для уменьшения загрузки процессора)
):
print result
'''
import threading
import time
import Queue
def main():
def tst(i):
time.sleep(1)
print '-> %i' % i
return '<- %i' % i
for s in run(tst, range(10), 2, 1):
print s
class Th(threading.Thread):
def __init__(self, callback, queue, ret_queue=None):
self.callback = callback
self.queue = queue
self.ret_queue = ret_queue
threading.Thread.__init__(self)
def run(self):
while True:
args = self.queue.get()
if args is None:
break
ret = self.callback(args)
if self.ret_queue != None:
self.ret_queue.put(ret)
def run(callback, tasks, thread_limit, queue_limit=None, sleep=0.01):
queue = Queue.Queue(queue_limit)
tasks = iter(tasks)
pool = []
ret_queue = Queue.Queue()
queue_limit = queue_limit if queue_limit else thread_limit * 5
while True:
# возвращаем результат
while not ret_queue.empty():
yield ret_queue.get()
# проверяем работоспособность потоков
pool = filter(lambda x: x.isAlive(), pool)
# добавляем недостающие потоки
while len(pool) < thread_limit:
th = Th(callback, queue, ret_queue)
th.setDaemon(True)
th.start()
pool.append(th)
# заполняем очередь заданий
while queue.qsize() < queue_limit:
try:
task = tasks.next()
except StopIteration:
break
queue.put(task)
else:
# задания ещё не кончились
time.sleep(sleep)
continue
break
# отключаем потоки
for th in pool:
queue.put(None)
# ожидаем завершения потоков
while True:
if threading.activeCount() == 1:
break
time.sleep(0.01)
# отдаём остатки
while not ret_queue.empty():
yield ret_queue.get()
if __name__ == "__main__":
main()