import asyncio class ClientSock def __init__ self ids self ids ids asy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
class ClientSock:
def __init__(self, ids):
self.ids = ids
async def send(self, msg):
await asyncio.sleep(.1) # иммитация работы с сетью
print('Отправлено сообщение {} клиенту {}'.format(msg, self.ids))
c_socks = [ClientSock(i) for i in range(10)] # генерируем клиентов
class Sender:
def __init__(self):
self.events = asyncio.Queue() # очередь с евентами
self.new_messages = asyncio.Queue() # очередь с предстоящими к отправке
# сообщениями
async def events_handler(self):
while True:
# ждем, пока в очереди появится событие
event = await self.events.get()
for c_sock in c_socks:
# проходимся по всем подписчикам и кладем для всех в очередь событие
await self.new_messages.put([c_sock, event])
async def new_message_getter(self):
while True:
# ждем предстоящее к отправке сообщение (пара [c_sock, event])
yield await self.new_messages.get()
async def sender_worker(self):
'''Наш воркер, которых будет несколько.
Следит за готовыми к отправке сообщениями.
Всего лишь берет клиентский сокет и запихивает туда евент'''
async for c_sock, event in self.new_message_getter():
await c_sock.send(event)
sender = Sender()
async def main():
for new_event in range(10*10**10):
# параллельно генерируем события
await asyncio.sleep(.2) # тикрейт
await sender.events.put(new_event)
loop = asyncio.get_event_loop()
loop.create_task(main())
for _ in range(20):
# добавляем воркеров
loop.create_task(sender.sender_worker())
loop.create_task(sender.events_handler())
loop.run_forever()