import time from pprint import pprint import logging import sys from c

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import time
from pprint import pprint
import logging
import sys
from collections import defaultdict
from threading import Thread, Event
from queue import Queue, Empty
from bot.stat import Stat
class Bot(object):
def __init__(self, concurrency=None):
if concurrency is None:
concurrency = {}
self.concurrency = concurrency
self.taskq = Queue()
self.resultq = Queue()
self.fatalq = Queue()
self.evt_stop = Event()
self.stat = Stat()
self.threads = defaultdict(lambda: {
'config': None,
'instances': [],
})
def wrapper(self, func):
def func_wrapper(bot, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
logging.exception('')
self.fatalq.put(sys.exc_info())
return func_wrapper
def start_threads(self):
for th_name, config in self.threads_config.items():
func = getattr(self, th_name)
for x in range(self.concurrency.get(th_name, 1)):
logging.debug('Starting thread %s #%d' % (th_name, (x + 1)))
th = Thread(
target=self.wrapper(func),
args=[self]
)
th.daemon = True
th.start()
self.threads[th_name]['config'] = config
self.threads[th_name]['instances'].append(th)
def run(self):
self.start_threads()
try:
while not self.evt_stop.is_set():
print('manager')
dead = False
for th_name, th_state in self.threads.items():
if dead:
break
for th in th_state['instances']:
if not th.is_alive():
logging.debug('Thread %s is dead' % th_name)
dead = True
break
if dead:
break
if self.fatalq.qsize():
break
time.sleep(1)
self.evt_stop.set()
finally:
self.shutdown()
def shutdown(self):
self.evt_stop.set()
for th_name, th_state in sorted(
self.threads.items(), key=lambda x: x[1]['config'].get('shutdown_order', 1)
):
logging.debug('Joining %s threads' % th_name)
[x.join() for x in th_state['instances']]
logging.debug('Stat:')
for key, count in self.stat.counters.items():
logging.debug(' - %s: %d' % (key, count))