import os from argparse import ArgumentParser import pika import Defin

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import os
from argparse import ArgumentParser
import pika
import Definition_h
import mq.blocking.publisher
import cslogging
from mq.message import Message
import random
import time
import threading
testLog = cslogging.getLogFilename("Test_rmq_connection")
logger = cslogging.getLogger("test", testLog)
STOP_FLAG = False
MAX_ELAPSED = 0
MSG_COUNTER = 0
def log(msg):
print msg
logger.info(msg)
def logException(e):
print "ERROR!: {}".format(e.message)
logger.exception('During message Publishing')
def exitTimer(highestInterval):
log("Exiting the Test")
waitTimeout = highestInterval * 2
log("Waiting for {} to finish connections in flight.".format(waitTimeout))
global STOP_FLAG
STOP_FLAG = True
time.sleep(waitTimeout)
log("Attempts Count: {}. With Maximum elapsed time {}.".format(MSG_COUNTER, MAX_ELAPSED))
log("Exit")
os._exit(0)
def drainQueue(queue):
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(credentials=pika.PlainCredentials(Definition_h.JHUB_MQ_USER, Definition_h.JHUB_MQ_PASS),
host=Definition_h.JHUB_MQ_HOST,
port=int(Definition_h.JHUB_MQ_PORT),
virtual_host=Definition_h.JHUB_MQ_VIRTUAL_HOST,
connection_attempts=Definition_h.RMQ_RECONNECTION_MAX_ATTEMPTS,
retry_delay=Definition_h.RMQ_RECONNECTION_DELAY,
heartbeat_interval=Definition_h.RMQ_HEARTBEAT_INTERVAL
))
connection.channel().queue_delete(queue=queue)
except Exception as e:
logException(e)
def connectAndPublish(queue):
try:
header = {}
data = ''
publisher = mq.blocking.publisher.getMqPublisher(logger, queue)
publisher.publish(Message(header, data))
except Exception as e:
logException(e)
def main(secondsToRun, lowestInterval, highestInterval):
log("Draining the queue from previous test run")
queue = "test-fragile-connection"
drainQueue(queue)
log("Starting Performance test")
threading.Timer(secondsToRun, exitTimer, args=(highestInterval,)).start()
global MSG_COUNTER
global MAX_ELAPSED
while True:
if not STOP_FLAG:
MSG_COUNTER += 1
log("[{}] Start".format(MSG_COUNTER))
t1 = time.time()
connectAndPublish(queue)
t2 = time.time()
delta = t2 - t1
MAX_ELAPSED = max(MAX_ELAPSED, delta)
log("[{}] Finish. Elapsed: {} sec".format(MSG_COUNTER, delta))
interval = random.uniform(lowestInterval, highestInterval)
log('[{}] sleeping before running a new connection {}'.format(MSG_COUNTER, interval))
time.sleep(interval)
if __name__ == '__main__':
try:
parser = ArgumentParser(description="Simulating Load on RMQ")
parser.add_argument("-s", '--seconds', help="number of seconds to run test", default=100)
parser.add_argument("-li", '--lowest-interval', help="lowest interval between initiating connection in seconds", default=0.01)
parser.add_argument("-hi", '--highest-interval', help="highest interval between initiating connection in seconds", default=1)
args = parser.parse_args()
main(int(args.seconds), float(args.lowest_interval), float(args.highest_interval))
except KeyboardInterrupt as e:
log("terminated forcibly")
os._exit(1)