import threading import time from Queue import Queue, Empty def make_work(callback, tasks, limit): """ Run up to "limit" threads, do tasks and yield results. Arguments: callback - the function that will process single task tasks - the sequence or iterator or queue of tasks limit - the maximum number of threads """ # If tasks is not a Queue convert them into Queue if not isinstance(tasks, Queue): new_tasks = Queue() [new_tasks.put(x) for x in tasks] tasks = new_tasks # Here results of task processing will be saved results = Queue() def _worker(tasks, results): while True: try: results.put(callback(tasks.get(False))) except Empty: return # Prepare and run up to "limit" threads threads = [] for x in xrange(limit): thread = threading.Thread(target=_worker, args=[tasks, results]) thread.start() threads.append(thread) # While tasks exist we can except some results in results queue while not tasks.empty(): yield results.get() # When tasks queue becomes empty wait for terminating of all threads for thread in threads: thread.join() # Let's see if some results were placed into results queue while not results.empty(): yield results.get() if __name__ == '__main__': tasks = range(100) def worker(arg): print '%s: %s' % (threading.currentThread(), arg) time.sleep(0.1) list(make_work(worker, tasks, 10))