import pycurl from cStringIO import StringIO import time from collections import deque from itertools import islice from urlparse import urlsplit import select # curl_multi_perform -> transfer data on ready sockets & get num of active handlers # curl_multi_fdset -> extract handlers to use in select/poll # curl_multi_info_read -> info about handlers completed their work DEFAULT_POOL_SIZE = 200 # Idea: # task generator (in term of Spider) and task queue are both # just task generators of different kind. They both produces tasks # and therefor should have same interface: # * get_task # * is_active class Crawler(object): def __init__(self, pool_size=DEFAULT_POOL_SIZE): self.pool_size = pool_size self._timer = { 'stat': time.time(), 'watchdog': time.time(), } self._timer_watchdog = time.time() self.init_multi_interface() self.build_task_generators() def build_task_generators(self): self.task_gens = [] if hasattr(self, 'task_generator'): self.task_gens.append(CallbackTaskGenerator(self.task_generator())) def init_multi_interface(self): self.handler_reg = {} self.free_handlers = deque() self.active_handlers = deque() self.multi = pycurl.CurlMulti() for x in xrange(self.pool_size): handler = pycurl.Curl() self.free_handlers.append(handler) self._next_timeout = None def process_handler_data(self, handler, task, body): try: result = body.splitlines()[0] except IndexError: result = 'NA' print task['url'], '-->', result def add_task_to_work(self, task): handler = self.free_handlers.pop() handler.setopt(pycurl.URL, task['url']) handler.setopt(pycurl.FOLLOWLOCATION, True) handler.setopt(pycurl.MAXREDIRS, 5) TIMEOUT = 15 handler.setopt(pycurl.TIMEOUT, TIMEOUT) handler.setopt(pycurl.CONNECTTIMEOUT, 5) buf = StringIO() self.handler_reg[handler] = { 'buffer': buf, 'task': task, 'time_start': time.time(), 'timeout': TIMEOUT, } handler.setopt(pycurl.WRITEFUNCTION, buf.write) self.multi.add_handle(handler) self.active_handlers.append(handler) def trigger_stat_logging(self, now): if now - self._timer['stat'] > 2: self._timer['stat'] = now print '===>>>' print 'Active handlers:' for handler in self.active_handlers: info = self.handler_reg[handler] age = time.time() - info['start_time'] print '* %s --> %s [%.3f]' % (id(handler), info['task']['url'], age) print '<<<===' def trigger_watchdog(self, now): if now - self._timer['watchdog'] > 1: killed_handlers = set() kill_list = [] self._timer['watchdog'] = now for handler in self.active_handlers: info = self.handler_reg[handler] if now - info['time_start'] > info['timeout']: kill_list.append((handler, info)) for handler, info in kill_list: print 'Explicitly kill timed out handler' self.process_completed_handler(handler, -1, 'Watchdog timeout (%d)' % info['timeout']) killed_handlers.add(handler) return killed_handlers def trigger_data_io(self): rlist, wlist, xlist = self.multi.fdset() if rlist or wlist or xlist: timeout = self.multi.timeout() if timeout: select.select(rlist, wlist, xlist, timeout / 1000.0) else: time.sleep(0.1) # Trigger data transfers while True: opcode, active_num = self.multi.perform() if opcode != pycurl.E_CALL_MULTI_PERFORM: break def get_completed_handlers(self): more_count, ok_handlers, fails = self.multi.info_read() for ok_handler in ok_handlers: yield ok_handler, None, None for fail_handler, err_no, err_msg in fails: yield fail_handler, err_no, err_msg def process_completed_handler(self, handler, err_no, err_msg): info = self.handler_reg[handler] if err_no is None: info['task']['callback'](handler, info['task'], info['buffer'].getvalue()) else: print u'ERROR [%s]: %s' % (info['task']['url'], err_msg) self.active_handlers.remove(handler) self.multi.remove_handle(handler) handler.reset() self.free_handlers.append(handler) def has_active_handlers(self): return len(self.active_handlers) > 0 def get_next_task(self): task = None for gen in self.task_gens: if gen.is_active(): task = gen.get_task() if task is not None: break return task def run(self): dump_time = time.time() while self.has_active_handlers() or any(x.is_active() for x in self.task_gens): if self.free_handlers: task = self.get_next_task() if task is not None: self.add_task_to_work(task) now = time.time() self.trigger_stat_logging(now) killed_handlers = self.trigger_watchdog(now) self.trigger_data_io() for handler, err_no, err_msg in self.get_completed_handlers(): if killed_handlers and handler in killed_handlers: pass else: self.process_completed_handler(handler, err_no, err_msg) class CallbackTaskGenerator(object): def __init__(self, gen): self.gen = gen self._active = True def get_task(self): if self._active: try: return self.gen.next() except StopIteration: self._active = False return None def is_active(self): return self._active