Python
27 Oct 2011
 
 
 
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python
from grab import Grab, multi_fetch
import time
from tools.common import make_work
import sys
import urllib
from StringIO import StringIO
def timer(func):
"""
Display time taken to execute the decorated function.
"""
def inner(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
total = time.time() - start
print 'Time: %.2f sec.' % total
return result
return inner
def generator(task_number, document):
"""
Generate test urls
"""
for x in xrange(task_number):
yield 'http://load.local/%s' % document
@timer
def sync(task_number, thread_number, document, module):
"""
Synchronious requests.
"""
print 'Tasks: %d, workers: %d, document: %s, module=%s' % (
task_number, thread_number, document, module)
if module == 'grab':
def worker(url):
g = Grab()
g.go(url)
#g.css_list('title')
else:
def worker(url):
g = Grab()
data = urllib.urlopen(url).read()
g.fake_response(data)
#g.css_list('title')
for res in make_work(worker, generator(task_number, document), thread_number):
pass
@timer
def async(task_number, thread_number, document):
"""
Asynchronious requests.
"""
print 'Tasks: %d, workers: %d, document: %s' % (
task_number, thread_number, document)
for res in multi_fetch(generator(task_number, document), thread_number):
#res['grab'].css_list('title')
#yield {'ok': True, 'grab': curl.grab.clone(),
#'url': curl._meta['url']}
pass
@timer
def async_curl(task_number, thread_number, document):
"""
http://pycurl.cvs.sourceforge.net/viewvc/pycurl/pycurl/examples/retriever-multi.py?revision=1.29&content-type=text%2Fplain
"""
import sys
import pycurl
# We should ignore SIGPIPE when using pycurl.NOSIGNAL - see
# the libcurl tutorial for more info.
try:
import signal
from signal import SIGPIPE, SIG_IGN
signal.signal(signal.SIGPIPE, signal.SIG_IGN)
except ImportError:
pass
# Get args
thread_number = 10
# Make a queue with (url, filename) tuples
queue = []
for url in generator(task_number, document):
queue.append(url)
task_number = len(queue)
# Pre-allocate a list of curl objects
m = pycurl.CurlMulti()
m.handles = []
for i in range(thread_number):
c = pycurl.Curl()
c.fp = None
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
m.handles.append(c)
# Main loop
freelist = m.handles[:]
num_processed = 0
while num_processed < task_number:
# If there is an url to process and a free curl object, add to multi stack
while queue and freelist:
url = queue.pop(0)
c = freelist.pop()
c.fp = StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.WRITEFUNCTION, c.fp.write)
m.add_handle(c)
# store some info
#c.filename = filename
c.url = url
# Run the internal curl state machine for the multi stack
while 1:
ret, num_handles = m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
# Check for curl objects which have terminated, and add them to the freelist
while 1:
num_q, ok_list, err_list = m.info_read()
for c in ok_list:
c.fp.close()
c.fp = None
m.remove_handle(c)
#print "Success:", c.url, c.getinfo(pycurl.EFFECTIVE_URL)
freelist.append(c)
for c, errno, errmsg in err_list:
c.fp.close()
c.fp = None
m.remove_handle(c)
#print "Failed: ", c.url, errno, errmsg
freelist.append(c)
num_processed = num_processed + len(ok_list) + len(err_list)
if num_q == 0:
break
# Currently no more I/O is pending, could do something in the meantime
# (display a progress bar, etc.).
# We just call select() to sleep until some more data is available.
m.select(1.0)
# Cleanup
for c in m.handles:
if c.fp is not None:
c.fp.close()
c.fp = None
c.close()
m.close()
if __name__ == '__main__':
print 'Sync. tests'
task_count = 1000
thread_count = 200
print 'Grab sync.'
sync(task_count, thread_count, '5b.html', 'grab')
sync(task_count, thread_count, '28k.html', 'grab')
print 'urllib sync.'
sync(task_count, thread_count, '5b.html', 'urllib')
sync(task_count, thread_count, '28k.html', 'urllib')
print 'Grab(multicurl) async.'
async(task_count, thread_count, '5b.html')
async(task_count, thread_count, '28k.html')
print 'multicurl async.'
async_curl(task_count, thread_count, '5b.html')
async_curl(task_count, thread_count, '28k.html')