from grab import Grab
from pprint import pprint
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import logging
import os
from glob import glob
POOL_RESET_PERIOD = 100
POOL_TASK_CHUNK = 100
def process_file(path):
data = open(path).read()
g = Grab(data)
try:
result = {
'header': g.doc('//h1').text(),
}
if result['header'] == 'Header #5':
raise Exception('Found Header #5')
except Exception as ex:
print('FILE FAILED:', path)
raise
# Also you can save result to database inside this process
return result
def generate_demo_files():
if not os.path.exists('html'):
os.mkdir('html')
for path in glob('html/*.html'):
os.unlink(path)
for x in xrange(1000):
with open('html/dump-%d.html' % x, 'w') as out:
out.write('
Header #%d
' % x)
def main():
logging.basicConfig(level=logging.DEBUG)
error_logger = logging.getLogger('error')
error_logger.addHandler(logging.FileHandler('error.log', 'w'))
pool = None
count = 0
futs = []
# GENERATE HTML FILES FOR DEMONSTRATION
generate_demo_files()
def task_iterator():
for path in glob('html/*.html'):
yield path
task_iter = task_iterator()
count = 0
while True:
if pool is None or not count % POOL_RESET_PERIOD:
if pool is not None:
pool.shutdown()
#pool = ProcessPoolExecutor()
pool = ThreadPoolExecutor(max_workers=1)
print('Pool reseted')
futs = []
for x in range(POOL_TASK_CHUNK):
count += 1
try:
task = next(task_iter)
except StopIteration:
break
else:
fut = pool.submit(process_file, task)
futs.append(fut)
print('Submited %d tasks into pool' % len(futs))
for fut in futs:
try:
result = fut.result()
except Exception as ex:
error_logger.error('URL: %s', exc_info=ex)
else:
# do something with parsed data
print('Got result: %s' % result['header'])
if not futs:
print('No more tasks in task iterator')
break
if __name__ == '__main__':
main()