Python
25 Jun 2009 HTML Text
 
 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
import time
from Queue import Queue, Empty

def make_work(callback, tasks, limit):
    """
    Run up to "limit" threads, do tasks and yield results.

    Arguments:
        callback - the function that will process single task
        tasks - the sequence or iterator or queue of tasks
        limit - the maximum number of threads
    """

    # If tasks is not a Queue convert them into Queue
    if not isinstance(tasks, Queue):
        new_tasks = Queue()
        [new_tasks.put(x) for x in tasks]
        tasks = new_tasks

    # Here results of task processing will be saved
    results = Queue()

    def _worker(tasks, results):
        while True:
            try:
                results.put(callback(tasks.get(False)))
            except Empty:
                return

    # Prepare and run up to "limit" threads
    threads = []
    for x in xrange(limit):
        thread = threading.Thread(target=_worker, args=[tasks, results])
        thread.start()
        threads.append(thread)

    # While tasks exist we can except some results in results queue
    while not tasks.empty():
        yield results.get()

    # When tasks queue becomes empty wait for terminating of all threads
    for thread in threads:
        thread.join()

    # Let's see if some results were placed into results queue
    while not results.empty():
        yield results.get()


if __name__ == '__main__':
    tasks = range(100)

    def worker(arg):
        print '%s: %s' % (threading.currentThread(), arg)
        time.sleep(0.1)

    list(make_work(worker, tasks, 10))