from grab import Grab from pprint import pprint from concurrent future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from grab import Grab
from pprint import pprint
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import logging
import os
from glob import glob
POOL_RESET_PERIOD = 100
POOL_TASK_CHUNK = 100
def process_file(path):
data = open(path).read()
g = Grab(data)
try:
result = {
'header': g.doc('//h1').text(),
}
if result['header'] == 'Header #5':
raise Exception('Found Header #5')
except Exception as ex:
print('FILE FAILED:', path)
raise
# Also you can save result to database inside this process
return result
def generate_demo_files():
if not os.path.exists('html'):
os.mkdir('html')
for path in glob('html/*.html'):
os.unlink(path)
for x in xrange(1000):
with open('html/dump-%d.html' % x, 'w') as out:
out.write('<h1>Header #%d</h1>' % x)
def main():
logging.basicConfig(level=logging.DEBUG)
error_logger = logging.getLogger('error')
error_logger.addHandler(logging.FileHandler('error.log', 'w'))
pool = None
count = 0
futs = []
# GENERATE HTML FILES FOR DEMONSTRATION
generate_demo_files()
def task_iterator():
for path in glob('html/*.html'):
yield path
task_iter = task_iterator()
count = 0
while True:
if pool is None or not count % POOL_RESET_PERIOD:
if pool is not None:
pool.shutdown()
#pool = ProcessPoolExecutor()
pool = ThreadPoolExecutor(max_workers=1)
print('Pool reseted')
futs = []
for x in range(POOL_TASK_CHUNK):
count += 1
try:
task = next(task_iter)
except StopIteration:
break
else:
fut = pool.submit(process_file, task)
futs.append(fut)
print('Submited %d tasks into pool' % len(futs))
for fut in futs:
try:
result = fut.result()
except Exception as ex:
error_logger.error('URL: %s', exc_info=ex)
else:
# do something with parsed data
print('Got result: %s' % result['header'])
if not futs:
print('No more tasks in task iterator')
break
if __name__ == '__main__':
main()