import aiohttp import asyncio from asyncio import Queue QueueEmpty imp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
import aiohttp
import asyncio
from asyncio import Queue, QueueEmpty
import logging
import re
RE_TITLE = re.compile(r'<title>([^<]+)</title>', re.S | re.I)
class Request(object):
def __init__(self, url, callback=None):
self.url = url
self.callback = callback
class Response(object):
def __init__(self, body):
self.body = body
class Reactor(object):
def __init__(self, concurrency=10):
self.loop = asyncio.get_event_loop()
self.task_queue = asyncio.Queue(maxsize=concurrency * 2)
self.concurrency = concurrency
def run(self):
self.loop.run_until_complete(self.run_loop())
@asyncio.coroutine
def task_generator_processor(self):
for task in self.task_generator():
yield from self.task_queue.put(task)
self.task_generator_enabled = False
def start_task_generator(self):
self.task_generator_enabled = True
self.loop.create_task(self.task_generator_processor())
@asyncio.coroutine
def perform_request(self, task):
print('Requesting {}'.format(task.url))
try:
io_res = yield from aiohttp.request('get', task.url)
except Exception as ex:
logging.error('', exc_info=ex)
else:
try:
body = yield from io_res.text()
except Exception as ex:
logging.error('', exc_info=ex)
body = ''
res = Response(
body=body,
)
if task.callback:
task.callback(task, res)
@asyncio.coroutine
def run_loop(self):
self.start_task_generator()
workers = []
self.run_loop_enabled = True
while self.run_loop_enabled:
workers = [x for x in workers if not x.done()]
if len(workers) < self.concurrency:
try:
task = self.task_queue.get_nowait()
except QueueEmpty:
if len(workers) == 0 and not self.task_generator_enabled:
self.run_loop_enabled = False
else:
worker = self.loop.create_task(self.perform_request(task))
workers.append(worker)
#print('sleeping')
yield from asyncio.sleep(0.1)
def add_task(self, task):
self.task_queue.put(task)
def task_generator(self):
count = 0
for host in open('var/domains.txt'):
host = host.strip()
if host:
yield Request('http://%s/' % host, callback=process_response)
count += 1
if count >= 10:
break
def process_response(req, res):
print('Result of request to {}'.format(req.url))
try:
title = RE_TITLE.search(res.body).group(1)
except AttributeError:
title = 'N/A'
print('Title: {}'.format(title))
def main():
reactor = Reactor(concurrency=20)
reactor.run()
if __name__ == '__main__':
main()