#-*- coding:utf-8 -*-
from __future__ import print_function
import time
from random import uniform
from abc import ABCMeta, abstractmethod, abstractproperty
from collections import Iterable, namedtuple
from operator import methodcaller, not_, isCallable
from decimal import Decimal, getcontext
from ast import literal_eval
from functools import partial
getcontext().prec = 8
SIMULATION_STEP = Decimal("0.1")
ERROR_CODE = 1
class SimulationError(Exception):
"""Raised when simulation process breaks"""
def __init__(self, message=""):
"""Place when error occured"""
super(Exception, self).__init__(self)
self.msg = "\n" + message
def __str__(self):
return "Simulation error -- check simulation parameters!" + self.msg
__repr__ = __str__
ComputationBlock = namedtuple("ComputationBlock", ("computer", "queue"))
def is_computation_block(obj):
return isinstance(obj, ComputationBlock)
class BernoulliDistribution(object):
"""Modeling Bernoulli distribution with giving variety"""
def __init__(self, variety):
variety = float(variety)
if variety < 0. or variety > 100.:
raise ValueError("Variety must be in (0,1) or (0,100) range!")
self.variety = variety if variety < 1. else variety / 100.
def __call__(self):
return uniform(0, 1) < self.variety
def __str__(self):
return "Bernoulli distribution(variety={0})".format(self.variety)
class TimerControlledObject(object):
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, *args, **kwargs):
pass
@abstractmethod
def tick(self, current_time):
pass
@abstractproperty
def is_done(self):
pass
class TimerControlledProcessor(TimerControlledObject):
@abstractmethod
def event_gen(self):
pass
def process_query(self, query):
if self.current_query is not None:
raise SimulationError("{0} recieved query when busy!".format(
self.__class__.__name__))
else:
self.current_query = query
self.next_event = self.event_gen()
class Timer(object):
""" Simulation timer """
def __init__(self, quantum):
self.quantum = float(quantum)
self.controlled_objects = []
self.simulation_time = Decimal("-0.1")
def check_is_controlled_object(self, item):
return issubclass(type(item), TimerControlledObject)
def register(self, obj):
"""Register object as TimerControlled. Don't raises error,
if objects are not supported, just don't append it."""
if isinstance(obj, Iterable):
self.controlled_objects.extend(
filter(self.check_is_controlled_object, obj))
else:
if self.check_is_controlled_object(obj):
self.controlled_objects.append(obj)
return len(self.controlled_objects)
def start_simulation(self):
while len(filter(lambda x:not(x.is_done), self.controlled_objects)):
self.simulation_time += SIMULATION_STEP
map(methodcaller('tick', self.simulation_time),
self.controlled_objects)
time.sleep(self.quantum)
return self.simulation_time
def get_current_time(self):
return self.simulation_time
def __str__(self):
return "Timer (quantum={0})".format(self.quantum)
__repr__ = __str__
class Query(object):
""" Representing query. Start time - generation query."""
def __init__(self, start_time):
self.start_time = start_time
def __call__(self, current_time):
self.end_time = current_time
return self
def __str__(self):
return "Start time={0}, end time={1}".format(self.start_time,
self.end_time)
__repr__ = __str__
class QueryController(TimerControlledProcessor):
"""Control query processing"""
def __init__(self, process_time, distribution, computation_block_list):
if isCallable(distribution):
self.distribution = distribution
else:
raise TypeError("Distribution must be callable object!")
if len(filter(is_computation_block, computation_block_list)) == 2:
self.first_block = computation_block_list[0]
self.second_block = computation_block_list[1]
else:
raise ValueError("Need two and only two computer ad queue!")
self.process_time = Decimal(str(process_time))
self.next_event = Decimal("-0.1")
self.current_query = None
def event_gen(self):
return self.current_query.start_time + self.process_time
def tick(self, current_time):
if current_time == self.next_event:
current_block = self.first_block if self.distribution() else \
self.second_block
if current_block.computer.is_done:
current_block.computer.process_query(self.current_query)
else:
current_block.queue.append(self.current_query)
self.current_query = None
@property
def is_done(self):
return self.current_query is None
def __str__(self):
return "Query controller with {0} and process time={1}".format(
self.distribution, self.process_time)
__repr__ = __str__
class QueryGenerator(TimerControlledObject):
"""Simple query generator (not a generator object)"""
def __init__(self, quantity, delay, query_controller, ):
self.quantity = int(quantity)
self.delay = Decimal(str(delay))
if hasattr(query_controller, 'process_query'):
self.query_controller = query_controller
else:
raise TypeError("Unsupported type of Query Controller")
self.next_event = Decimal("0.0")
self.delay_gen = lambda:self.delay + Decimal("%1.1f" % uniform(-3, 3))
@property
def is_done(self):
return self.quantity <= 0
def tick(self, current_time):
if current_time == self.next_event and self.quantity:
self.quantity -= 1
self.query_controller.process_query(Query(current_time))
self.next_event = current_time + self.delay_gen()
def __str__(self):
return "Query generator, count={0}, delay={1}".format(
self.quantity,self.delay)
__repr__ = __str__
class QueryQueue(list):
def __init__(self, *args, **kwargs):
super(QueryQueue, self).__init__(self, *args, **kwargs)
self.query_count = 0
def deque(self):
return self.pop(0)
def append(self, obj):
super(QueryQueue, self).append(obj)
self.query_count += 1
class Computer(TimerControlledProcessor):
def __init__(self, timer, queue, answer_time, transfer_time="0.0"):
if isinstance(timer, Timer):
self.timer = timer
else:
raise TypeError("timer argument must be Timer instance!")
if isinstance(queue, QueryQueue):
self.queue = queue
else:
raise TypeError("For control query count, queue must be QueryQueue")
self.answer_time = Decimal(str(answer_time))
self.answer_gen = lambda:self.answer_time + Decimal("%1.1f" % uniform(-2, 2))
self.transfer_time = Decimal(str(transfer_time))
self.current_query = None
self.answered_query = []
self.next_event = Decimal("-0.1")
def event_gen(self):
return self.timer.get_current_time() + self.answer_gen() + self.transfer_time
def tick(self, current_time):
if current_time == self.next_event:
self.answered_query.append(self.current_query(
self.timer.get_current_time()))
self.current_query = None
if len(self.queue):
self.process_query(self.queue.deque())
@property
def is_done(self):
return self.current_query is None
def __str__(self):
return "Computer with answer time={0}, transfer time={1}".format(
self.answer_time, self.transfer_time)
__repr__ = __str__
def run((quant, var, delay, proc_time, q_count, ans_time, trns_time),
count, out=None):
spec_print = partial(print, file=out)
try:
timer = Timer(quant)
bernoulli = BernoulliDistribution(var)
first_queue = QueryQueue()
second_queue = QueryQueue()
first_computer = Computer(timer, first_queue, ans_time)
second_computer = Computer(timer, second_queue, ans_time, trns_time)
block = (ComputationBlock(first_computer, first_queue),
ComputationBlock(second_computer, second_queue))
query_controller = QueryController(proc_time, bernoulli, block)
query_gen = QueryGenerator(q_count, delay, query_controller)
timer.register((query_gen, query_controller, first_computer,
second_computer))
except (ValueError, TypeError) as exc:
return (ERROR_CODE, exc.message)
if count == 1:
spec_print(timer)
spec_print(query_gen)
spec_print(query_controller)
spec_print(first_computer)
spec_print(second_computer)
spec_print("_"*50)
try:
result = timer.start_simulation()
except SimulationError as exc:
return (ERROR_CODE, exc.message)
spec_print("Expirement #{0} :".format(count))
spec_print("Result = {0}".format(result))
for ind, item in enumerate(block):
spec_print("{0} computer results:".format(ind+1))
#for index, value in enumerate(item.computer.answered_query):
# spec_print(value)
spec_print("All query: {0}".format(len(item.computer.answered_query)))
spec_print("Query from queue:{0}".format(item.computer.queue.query_count))
spec_print("_"*50)
return (0, "Success simulation! Check result")
if __name__ == '__main__':
file_name = raw_input("Enter filename for result, or skip this step ")
file_out = open(file_name, "w") if file_name.strip() else None
key = "Y"
default_param = ("0", "0.55", "9", "2.5", "460", "17", "3.6") #FIXIT!
welcome_text = """ Welcome to the simulation script!
Enter values of parametres, or press enter to apply default values.
(Quant={0}, variety={1}, generation delay={2}, processing time={3},
query count={4}, answer time={5}, transfer time={6}).
""".format(*default_param)
input_str = raw_input(welcome_text)
if input_str.strip():
param = literal_eval(input_str)
if not (len(param) == len(default_param) and isinstance(param, tuple)):
print("You entered wrong characters!")
key = "N"
else:
param = default_param
count = 1
while key=="Y" or key == "y":
res = run(param, count, file_out)
if res[0]:
print("Error occured:")
print(res[1])
key = "N"
else:
print(res[1])
key = raw_input("Enter Y for continue, other for exit ")
count += 1
if file_out: file_out.close()