import pycurl from cStringIO import StringIO import time from collections import deque from itertools import islice from urlparse import urlsplit import select # curl_multi_perform -> transfer data on ready sockets & get num of active handlers # curl_multi_fdset -> extract handlers to use in select/poll # curl_multi_info_read -> info about handlers completed their work DEFAULT_POOL_SIZE = 200 class NoMoreTask(Exception): pass class Crawler(object): def __init__(self, pool_size=DEFAULT_POOL_SIZE): self.pool_size = pool_size self._stat_timer = time.time() self.init_multi_interface() self.task_generator_object = self.task_generator() def init_multi_interface(self): self.handler_reg = {} self.free_handlers = deque() self.active_handlers = deque() self.multi = pycurl.CurlMulti() for x in xrange(self.pool_size): handler = pycurl.Curl() self.free_handlers.append(handler) self._next_timeout = None def process_handler_data(self, handler, task, body): try: result = body.splitlines()[0] except IndexError: result = 'NA' print task['url'], '-->', result def get_next_task(self): try: return self.task_generator_object.next() except StopIteration: raise NoMoreTask def add_task_to_work(self, task): handler = self.free_handlers.pop() handler.setopt(pycurl.URL, task['url']) handler.setopt(pycurl.FOLLOWLOCATION, True) handler.setopt(pycurl.MAXREDIRS, 5) handler.setopt(pycurl.TIMEOUT, 15) handler.setopt(pycurl.CONNECTTIMEOUT, 5) buf = StringIO() self.handler_reg[handler] = { 'buffer': buf, 'task': task, 'start_time': time.time(), } handler.setopt(pycurl.WRITEFUNCTION, buf.write) self.multi.add_handle(handler) self.active_handlers.append(handler) def trigger_stat_logging(self): if time.time() - self._stat_timer > 2: self._stat_timer = time.time() print '===>>>' print 'Active handlers:' for handler in self.active_handlers: info = self.handler_reg[handler] age = time.time() - info['start_time'] print '* %s --> %s [%.3f]' % (id(handler), info['task']['url'], age) print '<<<===' def trigger_data_io(self): rlist, wlist, xlist = self.multi.fdset() if rlist or wlist or xlist: timeout = self.multi.timeout() if timeout: select.select(rlist, wlist, xlist, timeout / 1000.0) else: time.sleep(0.1) # Trigger data transfers while True: opcode, active_num = self.multi.perform() if opcode != pycurl.E_CALL_MULTI_PERFORM: break def get_handler_results(self): more_count, ok_handlers, fails = self.multi.info_read() for ok_handler in ok_handlers: yield ok_handler, None, None for fail_handler, err_no, err_msg in fails: yield fail_handler, err_no, err_msg def process_done_handler(self, handler, err_no, err_msg): info = self.handler_reg[handler] if err_no is None: info['task']['callback'](handler, info['task'], info['buffer'].getvalue()) else: print u'ERROR [%s]: %s' % (info['task']['url'], err_msg) self.active_handlers.remove(handler) self.multi.remove_handle(handler) handler.reset() self.free_handlers.append(handler) def run(self): dump_time = time.time() while True: no_more_task = False if self.free_handlers: try: task = self.get_next_task() except NoMoreTask: no_more_task = True else: self.add_task_to_work(task) self.trigger_stat_logging() self.trigger_data_io() for handler, err_no, err_msg in self.get_handler_results(): self.process_done_handler(handler, err_no, err_msg) if not self.active_handlers and no_more_task: break class TestCrawler(Crawler): def task_generator(self): #for line in islice(open('/web/gev/var/top-1m.csv'), 100): #url = 'http://%s/robots.txt' % line.strip() #yield {'url': url} for x in xrange(1000): yield {'url': 'http://localhost:9000/?delay=0', 'callback': self.task_page} def task_page(self, handler, task, body): try: result = body.decode('utf-8', 'ignore').splitlines()[0] except IndexError: result = 'NA' print u'%s --> %s' % (task['url'], result) def main(): crawler = TestCrawler() crawler.run() if __name__ == '__main__': main()