from Queue import Empty, Queue
import time
import sys
from libpy.test_lock import test_lock
from libpy.threads.proxy import Proxy
from libpy.threads.watcher import Watcher
class Master(object):
"""
Master control number of slaves.
It loads and saves jobs using jobs and done_jobs queues.
"""
def __init__(self, Storage, Slave, slave_count, jobs_count=100, proxy='', proxy_file=None, daemon=False, lock_file=None, master_save=True):
"""
Args:
* Storage - Class of object that knows how to load and save jobs
* Slave - slave Class (subclass of processing.Process)
* slave_count - number of slaves to start
* jobs_count - size of jobs that should be load and save at one time
* lock_file - if locking this file will be failed then script will stop work
* master_save - if True than master saves jobs from done_jobs else slaves themself save done jobs
"""
self.Storage = Storage
self.Slave = Slave
self.slave_count = slave_count
self.jobs_count = jobs_count
self.daemon = daemon
self.master_save = master_save
self.storage = Storage()
if not lock_file:
raise Exception('You should provide lock_file argument')
self.lock_file = lock_file
if proxy_file :
self.proxy = Proxy(file=proxy_file)
else:
self.proxy = proxy
self.jobs = []
if self.master_save:
self.done_jobs = Queue()
self.banned_proxies = Queue()
self.slaves = []
def run(self):
Watcher()
try:
self.work()
except KeyboardInterrupt:
for slave in self.slaves:
print '%s must die!' % slave.getName()
#slave.terminate()
print 'KeyboardInterrupt killed me!'
def work(self):
self.check_lock_file()
self.active_jobs_ids = set()
while True:
# if no jobs then try to save done_jobs (if they exist)
# after that load new jobs
if not self.jobs:
if self.master_save:
self.save_jobs()
self.jobs = self.storage.load_jobs(self.jobs_count, exclude_ids=self.active_jobs_ids)
print 'loaded new jobs: %s' % [x.id for x in self.jobs]
self.active_jobs_ids.update([x.id for x in self.jobs])
if not self.jobs:
if self.daemon:
print 'daemon'
time.sleep(5)
else:
print 'no jobs!!!'
self.wait_slaves()
print 'saving jobs'
if self.master_save:
self.save_jobs()
break
# Every iteration control slaves
#print 'jobs: %d' % len(self.jobs)
self.update_slaves()
time.sleep(1)
print 'Master: No jobs. Bye-bye!'
def save_jobs(self):
"""Get all jobs from done_jobs queue and save them"""
#print 'saving job'
jobs_to_save = []
while True:
try:
jobs_to_save.append(self.done_jobs.get_nowait())
except Empty:
break
print 'saving jobs', jobs_to_save
self.storage.save_jobs(jobs_to_save)
def update_slaves(self):
"""
Kill slaves that has done work. Give work to new slaves
Update active_jobs_ids take in count that some slaves has done work
Keep number of slaves not more than slave_count
"""
#print 'active jobs:', self.active_jobs_ids
try:
while True:
proxy = self.banned_proxies.get_nowait()
print 'proxy %s banned!!!' % proxy
if isinstance(self.proxy, Proxy):
self.proxy.ban(proxy)
except Empty:
pass
print self.active_jobs_ids
# Release proxies of slaves that have done work
alive_slaves = []
print 'checking slaves'
for slave in self.slaves:
if slave.isAlive():
print 'live slave %s, job=[%d]' % (slave.getName(), slave.job.id)
alive_slaves.append(slave)
else:
print 'death slave %s, job=[%d]' % (slave.getName(), slave.job.id)
#print 'releasing proxy %s' % slave.proxy
if isinstance(self.proxy, Proxy):
self.proxy.release(slave.proxy)
#print 'releasing proxy=[%s], job=[%s]' %(slave.proxy, slave.job.id)
print 'removing job id=[%d]' % slave.job.id
self.active_jobs_ids.remove(slave.job.id)
# TODO: check that excluded slaves do not exists anymore!
# We intrested only in alive slaves
self.slaves = alive_slaves
print 'slaves after cleaning'
print ['slave %s, job=[%d]' % (x.getName(), x.job.id) for x in self.slaves]
#print 'slave_count: %d' % self.slave_count
#print 'slaves: %d' % len(self.slaves)
for x in xrange(self.slave_count - len(self.slaves)):
if self.jobs:
print 'self.jobs:', self.jobs
if isinstance(self.proxy, Proxy):
proxy = self.proxy.acquire(blocking=False)
else:
proxy = self.proxy
#print 'proxy: ', proxy
if not proxy is None:
job = self.jobs.pop(0)
if self.master_save:
slave = self.Slave(job, done_jobs=self.done_jobs,
banned_proxies=self.banned_proxies, proxy=proxy)
else:
slave = self.Slave(job, Storage=self.Storage,
banned_proxies=self.banned_proxies, proxy=proxy)
slave.start()
print 'slave %s got job=[%d]' % (slave.getName(), slave.job.id)
print 'self.jobs' % self.jobs
slave.job_id = job.id
self.slaves.append(slave)
else:
print 'no proxy'
break
else:
print 'no job'
break
def wait_slaves(self):
"""Wait untill all slaves finish their work"""
for slave in self.slaves:
print 'joining for %s' % slave.getName()
#while slave.isAlive():
#time.sleep(0.2)
slave.join()
print 'joined to %s' % slave.getName()
def check_lock_file(self):
if test_lock(self.lock_file):
print 'Script already running'
sys.exit(1)
class SimpleMaster(Master):
def __init__(self, *args, **kwargs):
kwargs['master_save'] = False
super(SimpleMaster, self).__init__(*args, **kwargs)