# -*- encoding: utf-8 -*-
import socket, select, time, queue
class stype(object):
def __repr__(self):
return '<' + self.__class__.__name__\
+ ' family=%s, type=%s, proto=%s>'\
%(self._opt['F'], self._opt['T'], self._opt['P'])
def __init__(self, line=None):
self._opt = {'F': socket.AF_INET, 'T': socket.SOCK_STREAM, 'P': socket.SOL_IP}
if line: self.load(line)
def load(self, line):
# F=AF_INET,T=SOCK_STREAM,P=SOL_TCP
if not type(line) is str:
raise TypeError('expected string, not %s' %line.__class__.__name__)
for q in line.split(','):
q = q.split('=')
if len(q) != 2: continue
if not q[0] in self._opt: continue
try: self._opt[q[0]] = int(socket.__getattribute__(q[1]))
except: continue
def socket(self):
return socket.socket(self._opt['F'], self._opt['T'], self._opt['P'])
class sm_socket_client(object):
def __init__(self, parent, id):
if not type(parent) == SrvManager:
raise TypeError('SrvManager expected, not %s' %parent.__class__.__name__)
self._parent = parent
self._id = id
# # так, или лучше через self.__getattribute__ ?
# for q in ['recv', 'send', 'close', 'settimeout',\
# 'gettimeout', 'setsockopt', 'getsockopt',\
# 'settimeout', 'gettimeout', 'fileno']:
# # FIXME: ыыы
# self.__setattr__(q, lambda *a, **ka: self._parent.sockact(self._id, eval('"%s"'%q), *a, **ka))
def __getattribute__(self, attr):
if attr in ['_parent', '_id']:
return super(sm_socket_client, self).__getattribute__(attr)
return self._parent.sockact(self._id, attr)
class SrvManager(object):
def __repr__(self):
return '<'+ self.__class__.__name__\
+ (' active=%s, pause=%s' %(bool(self._active), bool(self._pause)))\
+ ', ss=%s, cs=%s, aq=%s, dq=%s>' %(len(self._slist), len(self._clist),\
self._sadd.qsize(), self._sdel.qsize())
def __init__(self):
self._tlist = dict() #список типов :)
# слушающие
self._sadd = queue.Queue() # очередь на добавление: (type, bind_addr, bind_port)
self._sdel = queue.Queue() # очередь на удаление: (socket)
self._slist = dict() #список слушателей (socket->type)
# клиентские
self._clist = dict() #список клиентов (client->id)
self._ilist = dict() #списк клиентов (id->client)
self._dlist = queue.Queue() # очередь на удаление клиентов
# ---
self._pause = False
self._active = True
self._buffsz = 1024
def setEventList(self, evlist):
self._event = evlist
self._event.put((None, None))
def genId(self, c):
# FIXME: oO
try:
return hex( (int(time.time()) * 10000)\
+ (c[0].fileno() * 1000)\
+ (c[0].family * 100)\
+ (c[0].type * 10) + c[0].proto)
except: return hex(0)
def poll(self, timeout=0, event_limit=-1):
# наверно, былобы лучше разбить ети блоки на методы :)
try:
# возвращаемые списки
# только для клиентский сокетов, судьба "слушателей" никого волновать не должна :)
r_open = [] # только что открытые
r_close = [] # только что закрытые
r_ready = [] # готовые к чтению
# обработка очереди на добавление
act = self.active()
# временная очередь :)
temp_queue = queue.Queue()
while not self._sadd.empty() and act:
a = self._sadd.get()
if type(a) != tuple: continue
try:
# создаем сокет
if a[0] in self._tlist:
try:
s = self._tlist[a[0]].socket()
s.bind(a[1:3])
s.listen(5)
except socket.error:
del s
temp_queue.put(a)
continue
# удачно добавленно ^^
self._slist[s] = a[0]
except IndexError: continue
# переносим содержимое временной очереди
while not temp_queue.empty(): self._sadd.put(temp_queue.get())
# маленькая чистка слушателей
for q in self._slist:
if act:
if q.fileno() == -1: self._sdel.put(q) # кривой дискриптор - отмечаем на удаление
else: self._sdel.put(q) # мы неактивны... закрываемся.
# обработка очереди на удаление
while not self._sdel.empty():
a = self._sdel.get()
if not a in self._slist: continue
try: a.close()
except: pass
del self._slist[a]
# чистка клиентов
for q in self._clist:
if q.fileno() == -1: self._dlist.put(q)
elif not self._clist[q] in self._ilist: # проверяем есть ли у клиента id
self._dlist.put(q)
# проверяем, если ли у id клиент :)
for q in self._ilist:
if not self._ilist[q] in self._clist: self._dlist.put(q)
# обработка очереди на удаление для клиентов
while not self._dlist.empty():
a = self._dlist.get()
id = None
if a in self._ilist:
id = a
a = self._ilist[a]
elif a in self._clist: id = self._clist[a]
else: continue
try: a.close()
except: pass
try: del self._clist[a]
except: pass
try: del self._ilist[id]
except: pass
if id: r_close.append(id)
сс = 0
# делаем селект ^^^
for q in select.select(list(self._slist) + list(self._clist), [], [], timeout)[0]:
if event_limit > 0: event_limit -= 1
elif event_limit == 0: break
if q in self._slist: # если это "слушатель"
if self.paused(): q.accept()[0].close()
c = q.accept()
id = self.genId(c)
# если такой id уже есть, то пропускаем >_<
if id in self._ilist:
c.close()
continue
self._clist[c[0]] = id
self._ilist[id] = c[0]
# добавляем клиента в список только что открытых
r_open.append(id)
elif q in self._clist: # если клиент
# добавляем в список готовых для чтения
r_ready.append(self._clist[q])
else: pass
except: raise
return (r_open, r_close, r_ready)
def active(self): return bool(self._active)
def paused(self): return bool(self._pause)
def setActive(self, active): self._active = bool(active)
def setPause(self, pause): self._pause = bool(pause)
def type(self, name, type):
""" добавляет, удаляет или изменяет тип сокета в списках """
if not name.__class__ == str:
raise TypeError('`name`: need string, not %s' %name.__class__.__name__)
if not type.__class__ in [stype, None]:
raise TypeError('`type`: need stype or None, not %s' %name.__class__.__name__)
if not name: # удаляем
if name in self._tlist: del self._tlist[name]
elif not name in self._tlist: # Добавляем
self._tlist[name] = type
else: # изменяем
# при изменении типа, нужно все, уже открытые, сокеты перелопатить
# те закрыть и открыть с новыми параметрами
pass
def task(self, add=True, lport=0, type=None, laddr='0.0.0.0'):
if add:
if not type in self._tlist: return
self._sadd.put((type, laddr, lport))
else:
# TODO: надо доделать: научиться быстро извлекать нужный сокет из списка ^^
#self._sdel.append((None, type, laddr, lport))
raise RuntimeError('...')
def _getclient(self, id):
return (id in self._clist and id)\
or (id in self._ilist and self._ilist[id])\
or None
# client operations
def send(self, id, data):
""" отправляем информацию сокету """
if not type(id) in [tuple, list]: id = [id]
for q in id:
if not type(data) is bytes:
raise TypeError('expected bytes, not %s' %data.__class.__name__)
s = self._getclient(q)
r = 0
try:
if s: r = s.send(data)
except socket.error: self.close(q)
return r
def recv(self, id, bfsize=-1, timeout=0):
""" принимаем информацию с сокета """
if not type(id) in [tuple, list]: id = [id]
r = []
for q in id:
s = self._getclient(q)
if s and s.fileno() != -1 and select.select([s], [], [], timeout)[0]:
if bfsize > 0: r.append(s.recv(bfsize))
else:
tr = b''
while select.select([s], [], [], 0)[0]:
try: a = s.recv(self._buffsz)
except: a = b''
if not a: break
tr += a
r.append(tr)
if not len(r[-1]): self.close(s, True)
else:
if not s or s.fileno() == -1: self.close(s, True)
r.append(b'')
if len(r) > 1: return r
elif len(r) == 1: return r[0] # параноя... :)
def close(self, id, now=False):
""" закрываем указаный сокет """
if type(id) not in [tuple, list]: id = [id]
for q in id:
if now:
try: self._getclient(q).close()
except AttributeError: pass
self._dlist.put(q)
def sockact(self, id, act):
print('act: %s %s' %(id, act))
if act in ['send', 'recv', 'close']:
return lambda *x, **xx: self.__getattribute__(act)(id, *x, **xx)
else:
s = self._getclient(id)
if s:
return s.__getattribute__(act)
# try:
# a = s.__getattribute__(act)
# return (hasattr(a, '__call__') and a(*arg, **karg)) or a
# except socket.error:
# errno = __import__('sys').exc_info()[1].errno
# if errno == 9: self.close(id) # bad file descriptor
# raise
else: raise AttributeError('client not exists')
if __name__ == '__main__':
p = SrvManager()
p.type('s', stype())
p.task(True,6433, 's')