import pycurl
from cStringIO import StringIO
import time
from collections import deque
from itertools import islice
from urlparse import urlsplit
import select
# curl_multi_perform -> transfer data on ready sockets & get num of active handlers
# curl_multi_fdset -> extract handlers to use in select/poll
# curl_multi_info_read -> info about handlers completed their work
DEFAULT_POOL_SIZE = 200
# Idea:
# task generator (in term of Spider) and task queue are both
# just task generators of different kind. They both produces tasks
# and therefor should have same interface:
# * get_task
# * is_active
class Crawler(object):
def __init__(self, pool_size=DEFAULT_POOL_SIZE):
self.pool_size = pool_size
self._timer = {
'stat': time.time(),
'watchdog': time.time(),
}
self._timer_watchdog = time.time()
self.init_multi_interface()
self.build_task_generators()
def build_task_generators(self):
self.task_gens = []
if hasattr(self, 'task_generator'):
self.task_gens.append(CallbackTaskGenerator(self.task_generator()))
def init_multi_interface(self):
self.handler_reg = {}
self.free_handlers = deque()
self.active_handlers = deque()
self.multi = pycurl.CurlMulti()
for x in xrange(self.pool_size):
handler = pycurl.Curl()
self.free_handlers.append(handler)
self._next_timeout = None
def process_handler_data(self, handler, task, body):
try:
result = body.splitlines()[0]
except IndexError:
result = 'NA'
print task['url'], '-->', result
def add_task_to_work(self, task):
handler = self.free_handlers.pop()
handler.setopt(pycurl.URL, task['url'])
handler.setopt(pycurl.FOLLOWLOCATION, True)
handler.setopt(pycurl.MAXREDIRS, 5)
TIMEOUT = 15
handler.setopt(pycurl.TIMEOUT, TIMEOUT)
handler.setopt(pycurl.CONNECTTIMEOUT, 5)
buf = StringIO()
self.handler_reg[handler] = {
'buffer': buf,
'task': task,
'time_start': time.time(),
'timeout': TIMEOUT,
}
handler.setopt(pycurl.WRITEFUNCTION, buf.write)
self.multi.add_handle(handler)
self.active_handlers.append(handler)
def trigger_stat_logging(self, now):
if now - self._timer['stat'] > 2:
self._timer['stat'] = now
print '===>>>'
print 'Active handlers:'
for handler in self.active_handlers:
info = self.handler_reg[handler]
age = time.time() - info['start_time']
print '* %s --> %s [%.3f]' % (id(handler), info['task']['url'], age)
print '<<<==='
def trigger_watchdog(self, now):
if now - self._timer['watchdog'] > 1:
killed_handlers = set()
kill_list = []
self._timer['watchdog'] = now
for handler in self.active_handlers:
info = self.handler_reg[handler]
if now - info['time_start'] > info['timeout']:
kill_list.append((handler, info))
for handler, info in kill_list:
print 'Explicitly kill timed out handler'
self.process_completed_handler(handler, -1, 'Watchdog timeout (%d)' % info['timeout'])
killed_handlers.add(handler)
return killed_handlers
def trigger_data_io(self):
rlist, wlist, xlist = self.multi.fdset()
if rlist or wlist or xlist:
timeout = self.multi.timeout()
if timeout:
select.select(rlist, wlist, xlist, timeout / 1000.0)
else:
time.sleep(0.1)
# Trigger data transfers
while True:
opcode, active_num = self.multi.perform()
if opcode != pycurl.E_CALL_MULTI_PERFORM:
break
def get_completed_handlers(self):
more_count, ok_handlers, fails = self.multi.info_read()
for ok_handler in ok_handlers:
yield ok_handler, None, None
for fail_handler, err_no, err_msg in fails:
yield fail_handler, err_no, err_msg
def process_completed_handler(self, handler, err_no, err_msg):
info = self.handler_reg[handler]
if err_no is None:
info['task']['callback'](handler, info['task'], info['buffer'].getvalue())
else:
print u'ERROR [%s]: %s' % (info['task']['url'], err_msg)
self.active_handlers.remove(handler)
self.multi.remove_handle(handler)
handler.reset()
self.free_handlers.append(handler)
def has_active_handlers(self):
return len(self.active_handlers) > 0
def get_next_task(self):
task = None
for gen in self.task_gens:
if gen.is_active():
task = gen.get_task()
if task is not None:
break
return task
def run(self):
dump_time = time.time()
while self.has_active_handlers() or any(x.is_active() for x in self.task_gens):
if self.free_handlers:
task = self.get_next_task()
if task is not None:
self.add_task_to_work(task)
now = time.time()
self.trigger_stat_logging(now)
killed_handlers = self.trigger_watchdog(now)
self.trigger_data_io()
for handler, err_no, err_msg in self.get_completed_handlers():
if killed_handlers and handler in killed_handlers:
pass
else:
self.process_completed_handler(handler, err_no, err_msg)
class CallbackTaskGenerator(object):
def __init__(self, gen):
self.gen = gen
self._active = True
def get_task(self):
if self._active:
try:
return self.gen.next()
except StopIteration:
self._active = False
return None
def is_active(self):
return self._active