import time
from pprint import pprint
from threading import Thread
import json
import itertools
from kombu import Connection, Consumer, Queue, Exchange, Message
from ioweb.stat import Stat
def thread_producer(config, stat):
con = Connection(
#'amqp://%s@localhost:5672//' % config['rabbitmq_credentials'],
'redis://',
#transport_options={'confirm_publish': True},
)
con.connect()
prod = con.Producer()
#exc = Exchange('z', delivery_mode='persistent')
taskq = Queue('z')# exc)
while True:
msg = {'time': str(time.time())}
res = prod.publish(
#{'time': str(time.time())},
msg,
exchange='',#taskq.exchange,
routing_key=taskq.name,#taskq.routing_key,
declare=[taskq],
)
stat.inc('msg-sent')
def setup_arg_parser(parser):
parser.add_argument('-w', '--workers', type=int, default=1)
def main(workers, **kwargs):
with open('var/config.json') as inp:
config = json.load(inp)
stat = Stat(speed_keys=['msg-sent', 'msg-rcv'])
pool = []
for x in range(workers):
th = Thread(target=thread_producer, args=[config, stat])
th.daemon = True
th.start()
pool.append(th)
[x.join() for x in pool]