import pycurl
from cStringIO import StringIO
import time
from collections import deque
from itertools import islice
from urlparse import urlsplit
import select
# curl_multi_perform -> transfer data on ready sockets & get num of active handlers
# curl_multi_fdset -> extract handlers to use in select/poll
# curl_multi_info_read -> info about handlers completed their work
DEFAULT_POOL_SIZE = 200
class NoMoreTask(Exception):
pass
class Crawler(object):
def __init__(self, pool_size=DEFAULT_POOL_SIZE):
self.pool_size = pool_size
self._stat_timer = time.time()
self.init_multi_interface()
self.task_generator_object = self.task_generator()
def init_multi_interface(self):
self.handler_reg = {}
self.free_handlers = deque()
self.active_handlers = deque()
self.multi = pycurl.CurlMulti()
for x in xrange(self.pool_size):
handler = pycurl.Curl()
self.free_handlers.append(handler)
self._next_timeout = None
def process_handler_data(self, handler, task, body):
try:
result = body.splitlines()[0]
except IndexError:
result = 'NA'
print task['url'], '-->', result
def get_next_task(self):
try:
return self.task_generator_object.next()
except StopIteration:
raise NoMoreTask
def add_task_to_work(self, task):
handler = self.free_handlers.pop()
handler.setopt(pycurl.URL, task['url'])
handler.setopt(pycurl.FOLLOWLOCATION, True)
handler.setopt(pycurl.MAXREDIRS, 5)
handler.setopt(pycurl.TIMEOUT, 15)
handler.setopt(pycurl.CONNECTTIMEOUT, 5)
buf = StringIO()
self.handler_reg[handler] = {
'buffer': buf,
'task': task,
'start_time': time.time(),
}
handler.setopt(pycurl.WRITEFUNCTION, buf.write)
self.multi.add_handle(handler)
self.active_handlers.append(handler)
def trigger_stat_logging(self):
if time.time() - self._stat_timer > 2:
self._stat_timer = time.time()
print '===>>>'
print 'Active handlers:'
for handler in self.active_handlers:
info = self.handler_reg[handler]
age = time.time() - info['start_time']
print '* %s --> %s [%.3f]' % (id(handler), info['task']['url'], age)
print '<<<==='
def trigger_data_io(self):
rlist, wlist, xlist = self.multi.fdset()
if rlist or wlist or xlist:
timeout = self.multi.timeout()
if timeout:
select.select(rlist, wlist, xlist, timeout / 1000.0)
else:
time.sleep(0.1)
# Trigger data transfers
while True:
opcode, active_num = self.multi.perform()
if opcode != pycurl.E_CALL_MULTI_PERFORM:
break
def get_handler_results(self):
more_count, ok_handlers, fails = self.multi.info_read()
for ok_handler in ok_handlers:
yield ok_handler, None, None
for fail_handler, err_no, err_msg in fails:
yield fail_handler, err_no, err_msg
def process_done_handler(self, handler, err_no, err_msg):
info = self.handler_reg[handler]
if err_no is None:
info['task']['callback'](handler, info['task'], info['buffer'].getvalue())
else:
print u'ERROR [%s]: %s' % (info['task']['url'], err_msg)
self.active_handlers.remove(handler)
self.multi.remove_handle(handler)
handler.reset()
self.free_handlers.append(handler)
def run(self):
dump_time = time.time()
while True:
no_more_task = False
if self.free_handlers:
try:
task = self.get_next_task()
except NoMoreTask:
no_more_task = True
else:
self.add_task_to_work(task)
self.trigger_stat_logging()
self.trigger_data_io()
for handler, err_no, err_msg in self.get_handler_results():
self.process_done_handler(handler, err_no, err_msg)
if not self.active_handlers and no_more_task:
break
class TestCrawler(Crawler):
def task_generator(self):
#for line in islice(open('/web/gev/var/top-1m.csv'), 100):
#url = 'http://%s/robots.txt' % line.strip()
#yield {'url': url}
for x in xrange(1000):
yield {'url': 'http://localhost:9000/?delay=0', 'callback': self.task_page}
def task_page(self, handler, task, body):
try:
result = body.decode('utf-8', 'ignore').splitlines()[0]
except IndexError:
result = 'NA'
print u'%s --> %s' % (task['url'], result)
def main():
crawler = TestCrawler()
crawler.run()
if __name__ == '__main__':
main()