import os
from argparse import ArgumentParser
import pika
import Definition_h
import mq.blocking.publisher
import cslogging
from mq.message import Message
import random
import time
import threading
testLog = cslogging.getLogFilename("Test_rmq_connection")
logger = cslogging.getLogger("test", testLog)
STOP_FLAG = False
MAX_ELAPSED = 0
MSG_COUNTER = 0
def log(msg):
print msg
logger.info(msg)
def logException(e):
print "ERROR!: {}".format(e.message)
logger.exception('During message Publishing')
def exitTimer(highestInterval):
log("Exiting the Test")
waitTimeout = highestInterval * 2
log("Waiting for {} to finish connections in flight.".format(waitTimeout))
global STOP_FLAG
STOP_FLAG = True
time.sleep(waitTimeout)
log("Attempts Count: {}. With Maximum elapsed time {}.".format(MSG_COUNTER, MAX_ELAPSED))
log("Exit")
os._exit(0)
def drainQueue(queue):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(credentials=pika.PlainCredentials(Definition_h.JHUB_MQ_USER, Definition_h.JHUB_MQ_PASS),
host=Definition_h.JHUB_MQ_HOST,
port=int(Definition_h.JHUB_MQ_PORT),
virtual_host=Definition_h.JHUB_MQ_VIRTUAL_HOST,
connection_attempts=Definition_h.RMQ_RECONNECTION_MAX_ATTEMPTS,
retry_delay=Definition_h.RMQ_RECONNECTION_DELAY,
heartbeat_interval=Definition_h.RMQ_HEARTBEAT_INTERVAL
))
connection.channel().queue_delete(queue=queue)
except Exception as e:
logException(e)
def connectAndPublish(queue):
try:
header = {}
data = ''
publisher = mq.blocking.publisher.getMqPublisher(logger, queue)
publisher.publish(Message(header, data))
except Exception as e:
logException(e)
def main(secondsToRun, lowestInterval, highestInterval):
log("Draining the queue from previous test run")
queue = "test-fragile-connection"
drainQueue(queue)
log("Starting Performance test")
threading.Timer(secondsToRun, exitTimer, args=(highestInterval,)).start()
global MSG_COUNTER
global MAX_ELAPSED
while True:
if not STOP_FLAG:
MSG_COUNTER += 1
log("[{}] Start".format(MSG_COUNTER))
t1 = time.time()
connectAndPublish(queue)
t2 = time.time()
delta = t2 - t1
MAX_ELAPSED = max(MAX_ELAPSED, delta)
log("[{}] Finish. Elapsed: {} sec".format(MSG_COUNTER, delta))
interval = random.uniform(lowestInterval, highestInterval)
log('[{}] sleeping before running a new connection {}'.format(MSG_COUNTER, interval))
time.sleep(interval)
if __name__ == '__main__':
try:
parser = ArgumentParser(description="Simulating Load on RMQ")
parser.add_argument("-s", '--seconds', help="number of seconds to run test", default=100)
parser.add_argument("-li", '--lowest-interval', help="lowest interval between initiating connection in seconds", default=0.01)
parser.add_argument("-hi", '--highest-interval', help="highest interval between initiating connection in seconds", default=1)
args = parser.parse_args()
main(int(args.seconds), float(args.lowest_interval), float(args.highest_interval))
except KeyboardInterrupt as e:
log("terminated forcibly")
os._exit(1)