from Queue import Empty Queue import time import sys from libpy test_l

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from Queue import Empty, Queue
import time
import sys
from libpy.test_lock import test_lock
from libpy.threads.proxy import Proxy
from libpy.threads.watcher import Watcher
class Master(object):
"""
Master control number of slaves.
It loads and saves jobs using jobs and done_jobs queues.
"""
def __init__(self, Storage, Slave, slave_count, jobs_count=100, proxy='', proxy_file=None, daemon=False, lock_file=None, master_save=True):
"""
Args:
* Storage - Class of object that knows how to load and save jobs
* Slave - slave Class (subclass of processing.Process)
* slave_count - number of slaves to start
* jobs_count - size of jobs that should be load and save at one time
* lock_file - if locking this file will be failed then script will stop work
* master_save - if True than master saves jobs from done_jobs else slaves themself save done jobs
"""
self.Storage = Storage
self.Slave = Slave
self.slave_count = slave_count
self.jobs_count = jobs_count
self.daemon = daemon
self.master_save = master_save
self.storage = Storage()
if not lock_file:
raise Exception('You should provide lock_file argument')
self.lock_file = lock_file
if proxy_file :
self.proxy = Proxy(file=proxy_file)
else:
self.proxy = proxy
self.jobs = []
if self.master_save:
self.done_jobs = Queue()
self.banned_proxies = Queue()
self.slaves = []
def run(self):
Watcher()
try:
self.work()
except KeyboardInterrupt:
for slave in self.slaves:
print '%s must die!' % slave.getName()
#slave.terminate()
print 'KeyboardInterrupt killed me!'
def work(self):
self.check_lock_file()
self.active_jobs_ids = set()
while True:
# if no jobs then try to save done_jobs (if they exist)
# after that load new jobs
if not self.jobs:
if self.master_save:
self.save_jobs()
self.jobs = self.storage.load_jobs(self.jobs_count, exclude_ids=self.active_jobs_ids)
print 'loaded new jobs: %s' % [x.id for x in self.jobs]
self.active_jobs_ids.update([x.id for x in self.jobs])
if not self.jobs:
if self.daemon:
print 'daemon'
time.sleep(5)
else:
print 'no jobs!!!'
self.wait_slaves()
print 'saving jobs'
if self.master_save:
self.save_jobs()
break
# Every iteration control slaves
#print 'jobs: %d' % len(self.jobs)
self.update_slaves()
time.sleep(1)
print 'Master: No jobs. Bye-bye!'
def save_jobs(self):
"""Get all jobs from done_jobs queue and save them"""
#print 'saving job'
jobs_to_save = []
while True:
try:
jobs_to_save.append(self.done_jobs.get_nowait())
except Empty:
break
print 'saving jobs', jobs_to_save
self.storage.save_jobs(jobs_to_save)
def update_slaves(self):
"""
Kill slaves that has done work. Give work to new slaves
Update active_jobs_ids take in count that some slaves has done work
Keep number of slaves not more than slave_count
"""
#print 'active jobs:', self.active_jobs_ids
try:
while True:
proxy = self.banned_proxies.get_nowait()
print 'proxy %s banned!!!' % proxy
if isinstance(self.proxy, Proxy):
self.proxy.ban(proxy)
except Empty:
pass
print self.active_jobs_ids
# Release proxies of slaves that have done work
alive_slaves = []
print 'checking slaves'
for slave in self.slaves:
if slave.isAlive():
print 'live slave %s, job=[%d]' % (slave.getName(), slave.job.id)
alive_slaves.append(slave)
else:
print 'death slave %s, job=[%d]' % (slave.getName(), slave.job.id)
#print 'releasing proxy %s' % slave.proxy
if isinstance(self.proxy, Proxy):
self.proxy.release(slave.proxy)
#print 'releasing proxy=[%s], job=[%s]' %(slave.proxy, slave.job.id)
print 'removing job id=[%d]' % slave.job.id
self.active_jobs_ids.remove(slave.job.id)
# TODO: check that excluded slaves do not exists anymore!
# We intrested only in alive slaves
self.slaves = alive_slaves
print 'slaves after cleaning'
print ['slave %s, job=[%d]' % (x.getName(), x.job.id) for x in self.slaves]
#print 'slave_count: %d' % self.slave_count
#print 'slaves: %d' % len(self.slaves)
for x in xrange(self.slave_count - len(self.slaves)):
if self.jobs:
print 'self.jobs:', self.jobs
if isinstance(self.proxy, Proxy):
proxy = self.proxy.acquire(blocking=False)
else:
proxy = self.proxy
#print 'proxy: ', proxy
if not proxy is None:
job = self.jobs.pop(0)
if self.master_save:
slave = self.Slave(job, done_jobs=self.done_jobs,
banned_proxies=self.banned_proxies, proxy=proxy)
else:
slave = self.Slave(job, Storage=self.Storage,
banned_proxies=self.banned_proxies, proxy=proxy)
slave.start()
print 'slave %s got job=[%d]' % (slave.getName(), slave.job.id)
print 'self.jobs' % self.jobs
slave.job_id = job.id
self.slaves.append(slave)
else:
print 'no proxy'
break
else:
print 'no job'
break
def wait_slaves(self):
"""Wait untill all slaves finish their work"""
for slave in self.slaves:
print 'joining for %s' % slave.getName()
#while slave.isAlive():
#time.sleep(0.2)
slave.join()
print 'joined to %s' % slave.getName()
def check_lock_file(self):
if test_lock(self.lock_file):
print 'Script already running'
sys.exit(1)
class SimpleMaster(Master):
def __init__(self, *args, **kwargs):
kwargs['master_save'] = False
super(SimpleMaster, self).__init__(*args, **kwargs)