Id proxy py 2006 02 05 14 53 03 lightdruid Exp import socket import sy

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#
# $Id: proxy.py,v 1.6 2006/02/05 14:53:03 lightdruid Exp $
#
import socket
import sys,string
#sys.path.insert(0, '../..')
import struct
from base64 import b64encode, b64decode
from isocket import ISocket
#from utils import *
#from logger import log
class Proxy(ISocket):
def __init__(self, host, port, default_charset = 'cp1251'):
ISocket.__init__(self, host, port, default_charset)
class HttpsProxy(Proxy):
def connect(self, host, port, userName = None, password = None):
raise NotImplementedError("Https4Proxy")
class HttpProxy(Proxy):
def connect(self, host, port, userName = None, password = None):
ISocket.connect(self)
request = "CONNECT %s:%d HTTP/1.1\r\nHost: %s:%d\r\n" %\
(host, port, host, port)
if userName is not None:
r = b64encode("%s:%d" % (userName, password))
raise 1
request += "\r\n"
ISocket.send(self, request)
response = ISocket.read(self, 1024)
status = response.split()[1]
if status not in ["200"]:
raise Exception(response)
class Socks4Proxy(Proxy):
def connect(self, host, port, userName = None, password = None):
ISocket.connect(self)
buf = chr(0x04) #version
buf += chr(0x01) #cmd connect
buf += struct.pack('!H', port)
buf += socket.inet_aton(socket.gethostbyname(host))
buf += chr(0x00)
ISocket.send(self, buf)
buf = ISocket.read(self, 100)
if ord(buf[0])!= 0 or ord(buf[1])!= 90:
raise Exception("Bad answer from SOCKS4 proxy")
class Socks5Proxy(Proxy):
def connect(self, host, port, userName = None, password = None):
ISocket.connect(self)
buf = chr(0x05)
if userName is not None:
buf += chr(0x02) # two methods
buf += chr(0x00) # no authentication
buf += chr(0x02) # username/password authentication
else:
buf += chr(0x01)
buf += chr(0x00)
ISocket.send(self, buf)
buf = ISocket.read(self, 2)
if ord(buf[0]) != 0x05 or ord(buf[1]) == 0xff:
raise Exception("Bad answer from SOCKS5 proxy")
if userName is not None:
buf = '\x01' + struct.pack('!B', len(userName)) + userName
buf += struct.pack('!B', len(password)) + password
ISocket.send(self, buf)
buf = ISocket.read(self, 2)
print len(buf)
if len(buf) != 2 or ord(buf[0]) != 0x01 or ord(buf[1]) != 0x00:
raise Exception("Bad answer from SOCKS5 proxy")
# Version, CONNECT, reserved, address type: host name
buf = '\x05\x01\x00\x03'
# Then actual data
buf += struct.pack('!B', len(host)) + host
buf += struct.pack('!H', port)
ISocket.send(self, buf)
buf = ISocket.read(self, 100)
if ord(buf[0]) != 0x05 or ord(buf[1]) != 0x00:
raise Exception("Bad answer from SOCKS5 proxy")
#class for abstracting proxy type
#format initStr host:port@type
class ProxyFabrick:
def __init__(self,initStr):
self.host,tmp = initStr.split(':')
self.port,self.type = tmp.split('@')
self.port = int(self.port)
if self.type.find("SOCKS4")!=-1:
self.sock = Socks4Proxy(self.host,self.port)
else :
if self.type.find("SOCKS5")!=-1:
self.sock = Socks5Proxy(self.host,self.port)
else :
raise Exception("Unknow proxy type")
def connect (self, host, port, userName = None, password = None):
self.sock.connect(host, port, userName, password)
def read (self,chunk_size):
return self.sock.read(chunk_size)
def send(self,data):
return self.sock.send(data)
def disconnect(self):
self.sock.disconnect()
def _test():
# export http_proxy="http://user:passwd@your.proxy.server:port/"
# p = Socks5Proxy('localhost', 1080)
p = HttpProxy('localhost', 3128)
p.connect('login.icq.com', 5190)
# ---