producer/consumer system

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import threading
import itertools
import time
class Slave(threading.Thread):
"""Slave object gets job from master, does it and sends back to master"""
def __init__(self, master, *args, **kwargs):
self.master = master
super(Slave, self).__init__(*args, **kwargs)
print '%s: begin to work' % self.getName()
def run(self):
while True:
job = self.master.get_job()
if not job:
break
self.do_job(job)
self.master.finish_job(job)
print '%s: end to work' % self.getName()
def do_job(self, job):
raise Exception('You must implement do_job method')
class Master(object):
"""Master objects serves number of slaves. It load and save jobs"""
def __init__(self, storage, done_jobs_limit=10):
"""Args:
* Storage - object that loads and saves jobs
"""
self.storage = storage
self.get_job_lock = threading.Lock()
self.finish_job_lock = threading.Lock()
self.done_jobs_limit = done_jobs_limit
self.jobs = []
self.done_jobs = []
def get_job(self):
self.get_job_lock.acquire()
if not self.jobs:
self.jobs = self.storage.load_jobs()
if not self.jobs:
job = None
else:
job = self.jobs.pop()
self.get_job_lock.release()
return job
def finish_job(self, job):
self.finish_job_lock.acquire()
print job, 'finished'
self.done_jobs.append(job)
if len(self.done_jobs) == self.done_jobs_limit:
self.storage.save_jobs(self.done_jobs)
self.done_jobs = []
self.finish_job_lock.release()
class SimpleSlave(Slave):
def do_job(self, job):
print '%s: working on' % self.getName(), job
time.sleep(1)
class Storage(object):
def load_jobs(self):
print 'Loading jobs'
if not hasattr(self, 'touched'):
self.touched = True
return [1, 2, 3]
else:
return []
def save_jobs(self, jobs):
for job in jobs:
print 'Saving job', job
master = Master(Storage(), done_jobs_limit=3)
for x in xrange(2):
slave = SimpleSlave(master)
slave.start()