from grab import Grab from pprint import pprint from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import logging import os from glob import glob POOL_RESET_PERIOD = 100 POOL_TASK_CHUNK = 100 def process_file(path): data = open(path).read() g = Grab(data) try: result = { 'header': g.doc('//h1').text(), } if result['header'] == 'Header #5': raise Exception('Found Header #5') except Exception as ex: print('FILE FAILED:', path) raise # Also you can save result to database inside this process return result def generate_demo_files(): if not os.path.exists('html'): os.mkdir('html') for path in glob('html/*.html'): os.unlink(path) for x in xrange(1000): with open('html/dump-%d.html' % x, 'w') as out: out.write('

Header #%d

' % x) def main(): logging.basicConfig(level=logging.DEBUG) error_logger = logging.getLogger('error') error_logger.addHandler(logging.FileHandler('error.log', 'w')) pool = None count = 0 futs = [] # GENERATE HTML FILES FOR DEMONSTRATION generate_demo_files() def task_iterator(): for path in glob('html/*.html'): yield path task_iter = task_iterator() count = 0 while True: if pool is None or not count % POOL_RESET_PERIOD: if pool is not None: pool.shutdown() #pool = ProcessPoolExecutor() pool = ThreadPoolExecutor(max_workers=1) print('Pool reseted') futs = [] for x in range(POOL_TASK_CHUNK): count += 1 try: task = next(task_iter) except StopIteration: break else: fut = pool.submit(process_file, task) futs.append(fut) print('Submited %d tasks into pool' % len(futs)) for fut in futs: try: result = fut.result() except Exception as ex: error_logger.error('URL: %s', exc_info=ex) else: # do something with parsed data print('Got result: %s' % result['header']) if not futs: print('No more tasks in task iterator') break if __name__ == '__main__': main()