coding utf8 import logging import importlib import time from functools

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# coding:utf8
import logging
import importlib
import time
from functools import wraps
from captcha_solver.const import SOLVER_BACKEND_ALIAS, TRANSPORT_BACKEND_ALIAS
from captcha_solver.error import SolutionNotReady, SolutionTimeoutError, \
ServiceTooBusy
logger = logging.getLogger('captcha_solver')
def import_string(path):
module_path, cls_name = path.rsplit('.', 1)
return getattr(importlib.import_module(module_path), cls_name)
def repeat(common_time, delay, catch_excs, raise_exc, f):
@wraps(f)
def wrapped(*args, **kwargs):
for _ in range(0, int(common_time / delay), delay):
try:
return f(*args, **kwargs)
except catch_excs:
time.sleep(delay)
raise raise_exc
return wrapped
class CaptchaSolver(object):
"""
This class implements API to communicate with
remote captcha solving service.
"""
def __init__(self, captcha_backend, network_backend='urllib', **kwargs):
captcha_backend_path = SOLVER_BACKEND_ALIAS.get(captcha_backend,
captcha_backend)
network_backend_path = TRANSPORT_BACKEND_ALIAS.get(network_backend,
network_backend)
self.captcha_backend = import_string(captcha_backend_path)()
self.network_backend = import_string(network_backend_path)()
self.captcha_backend.setup(**kwargs)
def submit_captcha(self, image_data, **kwargs):
logger.debug('Submiting captcha')
data = self.captcha_backend.get_submit_captcha_request_data(image_data,
**kwargs)
response = self.network_backend.request(data['url'], data['post_data'])
return self.captcha_backend.parse_submit_captcha_response(response)
def check_solution(self, captcha_id):
"""
Raises:
* SolutionNotReady
* ServiceTooBusy
"""
data = self.captcha_backend.get_check_solution_request_data(captcha_id)
response = self.network_backend.request(data['url'], data['post_data'])
return self.captcha_backend.parse_check_solution_response(response)
def solve_captcha(self, data, submiting_time=30, submiting_delay=3,
recognition_time=120, recognition_delay=5, **kwargs):
captcha_id = repeat(submiting_time,
submiting_delay,
(ServiceTooBusy,),
SolutionTimeoutError("Service has no slots "
"available after %s seconds"
% submiting_time),
self.submit_captcha)(image_data=data)
return repeat(recognition_time,
recognition_delay,
(SolutionNotReady,),
SolutionTimeoutError("Captcha is not ready after "
"%s seconds" % submiting_time),
self.check_solution)(captcha_id)