301 lines
9.1 KiB
Python
Executable File
301 lines
9.1 KiB
Python
Executable File
#!/usr/bin/env python2
|
|
|
|
import sys #exit
|
|
import gobject
|
|
import pygtk
|
|
pygtk.require('2.0')
|
|
import gtk
|
|
import multiprocessing
|
|
import threading
|
|
import time
|
|
import random
|
|
|
|
gtk.gdk.threads_init()
|
|
|
|
# Any time you connect to any of these signals, they should be
|
|
# protected with a threading.Lock or similar.
|
|
class ObserverBase(gobject.GObject):
|
|
__gsignals__ = {
|
|
'message_event' : (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
(gobject.TYPE_STRING,)),
|
|
'updated' : (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
(gobject.TYPE_FLOAT,)),
|
|
'finished': (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
()),
|
|
'errored': (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
())
|
|
}
|
|
|
|
def __init__(self):
|
|
gobject.GObject.__init__(self)
|
|
|
|
gobject.type_register(ObserverBase)
|
|
|
|
class Observer(ObserverBase):
|
|
def __init__(self, queue):
|
|
ObserverBase.__init__(self)
|
|
self.queue = queue
|
|
|
|
def start(self):
|
|
# Create new thread to do the listening
|
|
self.thread = threading.Thread(target=self.listen, args=())
|
|
self.thread.start()
|
|
|
|
def join(self):
|
|
self.thread.join()
|
|
|
|
def listen(self):
|
|
while True:
|
|
# Listen for results on the queue and process them accordingly
|
|
print "Observer is waiting for data"
|
|
data = self.queue.get()
|
|
print "Observer received: ",data
|
|
dataType = type(data).__name__
|
|
if dataType=="bool" and data==True:
|
|
self.emit('finished')
|
|
return
|
|
if dataType=="bool" and data==False:
|
|
self.emit('errored')
|
|
return
|
|
elif dataType=="str":
|
|
self.emit('message_event', data)
|
|
elif dataType=="float":
|
|
self.emit('updated', data)
|
|
else:
|
|
raise RuntimeError("Queue had an unknown data type ('"+dataType+"') with value '"+str(data)+"'")
|
|
|
|
gobject.type_register(Observer)
|
|
|
|
class Process(gobject.GObject):
|
|
__gsignals__ = {
|
|
'starting': (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
()),
|
|
'joined': (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
())
|
|
}
|
|
|
|
def __init__(self, function, arguments):
|
|
gobject.GObject.__init__(self)
|
|
# Create queue
|
|
self.queue = multiprocessing.Queue()
|
|
|
|
# Create process
|
|
if arguments==(): arguments=(self.queue,)
|
|
self.process = multiprocessing.Process(target=function, args=arguments)
|
|
|
|
# Create observer
|
|
self.observer = Observer(self.queue)
|
|
|
|
def getObserver(self):
|
|
return self.observer
|
|
|
|
def start(self):
|
|
# Starting observer
|
|
self.emit("starting")
|
|
print "starting observer"
|
|
self.observer.start()
|
|
# Start process
|
|
print "starting process"
|
|
self.process.start()
|
|
|
|
def join(self):
|
|
# Clean up after process
|
|
print "cleaning up after process"
|
|
self.process.join()
|
|
self.emit("joined")
|
|
# Clean up after observer
|
|
print "cleaning up after observer"
|
|
self.queue.put(False) # If the false is ever read, it's an error
|
|
self.observer.join()
|
|
# Clean up queue
|
|
#self.queue.close()
|
|
|
|
gobject.type_register(Process)
|
|
|
|
class CombinedObserver(ObserverBase):
|
|
def __init__(self, observers, weights=None):
|
|
ObserverBase.__init__(self)
|
|
self.observers = observers
|
|
numObservers = len(observers)
|
|
if weights==None:
|
|
self.weights = [1 for i in range(numObservers)]
|
|
else:
|
|
self.weights = weights
|
|
|
|
sum_weights = sum(self.weights)
|
|
self.weights = [float(w)/sum_weights for w in self.weights]
|
|
print "weights = ",self.weights
|
|
|
|
index = 0
|
|
for observer in self.observers:
|
|
#observer.connect("message_event", onMessageReceived, index)
|
|
observer.connect("updated", self.onUpdate, index)
|
|
observer.connect("finished", self.onFinished, index)
|
|
observer.connect("errored", self.onErrored, index)
|
|
index = index+1
|
|
|
|
self.fractions = [0 for i in range(numObservers)]
|
|
self.finished = [False for i in range(numObservers)]
|
|
self.errored = [False for i in range(numObservers)]
|
|
self.lock = threading.RLock()
|
|
|
|
def onUpdate(self, widget, fraction, index):
|
|
self.lock.acquire()
|
|
self.fractions[index] = fraction * self.weights[index]
|
|
self.emit("updated", sum(self.fractions))
|
|
self.lock.release()
|
|
|
|
def onFinished(self, widget, index):
|
|
self.lock.acquire()
|
|
self.finished[index] = True
|
|
if all(self.finished):
|
|
self.emit("finished")
|
|
self.lock.release()
|
|
|
|
def onErrored(self, widget, index):
|
|
self.lock.acquire()
|
|
self.errored[index] = True
|
|
self.emit("errored")
|
|
self.lock.release()
|
|
|
|
gobject.type_register(CombinedObserver)
|
|
|
|
# A ProgressWorker is responsible for (a) getting the job done and (b)
|
|
# reporting progress. Progress is reported via the queue. Fractions
|
|
# (from 0 to 1) indicate the proportion of the job that is finished.
|
|
# Strings may indicate what has been done or what is about to be done.
|
|
# The boolean 'True' is placed on the queue when the job is finished.
|
|
class ProgressWorker():
|
|
def getResult(self):
|
|
return None
|
|
|
|
def setQueue(self, queue):
|
|
self.queue = queue
|
|
|
|
def go(self):
|
|
raise RuntimeError("Worker.go() has not been overridden")
|
|
|
|
class WorkerExample(ProgressWorker):
|
|
def __init__(self):
|
|
#Worker.__init__(self)
|
|
manager = multiprocessing.Manager()
|
|
self.result = manager.list()
|
|
|
|
def getResult(self):
|
|
return self.result
|
|
|
|
def go(self, queue):
|
|
self.setQueue(queue)
|
|
print "The worker has started doing some work (counting from 0 to 9)"
|
|
for i in range(10):
|
|
self.queue.put("working on step "+str(i)+"...")
|
|
proportion = (float(i)+1)/10
|
|
slp = random.uniform(0.50, 3.50)
|
|
time.sleep(slp)
|
|
self.queue.put(proportion)
|
|
self.queue.put("done.")
|
|
self.queue.put(1.0)
|
|
self.queue.put(True)
|
|
print "The worker has finished."
|
|
self.result.append("The the work has been finished and the result isn't 42.")
|
|
|
|
class ProgressBar(gobject.GObject):
|
|
__gsignals__ = {
|
|
'updated' : (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
(gobject.TYPE_FLOAT,)),
|
|
'finished': (gobject.SIGNAL_RUN_LAST,
|
|
gobject.TYPE_NONE,
|
|
())
|
|
}
|
|
|
|
def __init__(self, observer):
|
|
gobject.GObject.__init__(self)
|
|
self.observer = observer
|
|
self.observer.connect("updated",self.displayUpdate)
|
|
self.observer.connect("message_event",self.displayMessage)
|
|
self.observer.connect("finished",self.finish)
|
|
self.progress = gtk.ProgressBar()
|
|
self.progress.connect("destroy", self.destroy)
|
|
self.process=None
|
|
self.lock = threading.RLock()
|
|
|
|
def displayUpdate(self, obj, fraction):
|
|
self.lock.acquire()
|
|
self.progress.set_fraction(fraction)
|
|
self.emit("updated", fraction)
|
|
self.lock.release()
|
|
|
|
def displayMessage(self, obj, text):
|
|
self.lock.acquire()
|
|
self.progress.set_text(text)
|
|
self.lock.release()
|
|
|
|
def finish(self, obj, data=None):
|
|
self.emit("finished")
|
|
|
|
def destroy(self, widget, data=None):
|
|
print "Destroying"
|
|
if self.process==None:
|
|
return
|
|
self.queue.put("finished")
|
|
self.process.join()
|
|
self.thread.join()
|
|
|
|
gobject.type_register(ProgressBar)
|
|
|
|
class ProgressBarWindow:
|
|
def __init__(self, observer, title):
|
|
self.progressBar = ProgressBar(observer)
|
|
self.progressBar.connect("finished", self.done)
|
|
window = gtk.Window(gtk.WINDOW_TOPLEVEL)
|
|
window.set_border_width(10)
|
|
window.set_title(title)
|
|
window.add(self.progressBar.progress)
|
|
window.show_all()
|
|
window.connect("destroy", self.destroy)
|
|
|
|
def main(self):
|
|
gtk.main()
|
|
|
|
def done(self, widget, data=None):
|
|
#self.destroy()
|
|
print "FINISHED!"
|
|
|
|
def destroy(self, widget=None, data=None):
|
|
gtk.main_quit()
|
|
|
|
# Test
|
|
if __name__ == '__main__':
|
|
# n workers
|
|
worker = []
|
|
process = []
|
|
observer =[]
|
|
window = []
|
|
|
|
for i in range(10):
|
|
worker.append(WorkerExample())
|
|
process.append(Process(worker[i].go, ()))
|
|
observer.append(Process[i].getObserver())
|
|
|
|
combo = CombinedObserver(observer)
|
|
|
|
window0 = ProgressBarWindow(combo, "Workers' progress")
|
|
for i in range(10):
|
|
window.append(ProgressBarWindow(observer[i], "Workers {}'s progress".format(str(i))))
|
|
|
|
for i in range(10):
|
|
process[i].start()
|
|
|
|
gtk.main()
|
|
|
|
print "finished"
|
|
sys.exit()
|
|
|