import time from pprint import pprint from threading import Thread imp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import time
from pprint import pprint
from threading import Thread
import json
import itertools
from kombu import Connection, Consumer, Queue, Exchange, Message
from ioweb.stat import Stat
def thread_producer(config, stat):
con = Connection(
#'amqp://%s@localhost:5672//' % config['rabbitmq_credentials'],
'redis://',
#transport_options={'confirm_publish': True},
)
con.connect()
prod = con.Producer()
#exc = Exchange('z', delivery_mode='persistent')
taskq = Queue('z')# exc)
while True:
msg = {'time': str(time.time())}
res = prod.publish(
#{'time': str(time.time())},
msg,
exchange='',#taskq.exchange,
routing_key=taskq.name,#taskq.routing_key,
declare=[taskq],
)
stat.inc('msg-sent')
def setup_arg_parser(parser):
parser.add_argument('-w', '--workers', type=int, default=1)
def main(workers, **kwargs):
with open('var/config.json') as inp:
config = json.load(inp)
stat = Stat(speed_keys=['msg-sent', 'msg-rcv'])
pool = []
for x in range(workers):
th = Thread(target=thread_producer, args=[config, stat])
th.daemon = True
th.start()
pool.append(th)
[x.join() for x in pool]