#!/usr/bin/env python2 import sys #exit import gobject import pygtk pygtk.require('2.0') import gtk import multiprocessing import threading import time import random gtk.gdk.threads_init() # Any time you connect to any of these signals, they should be # protected with a threading.Lock or similar. class ObserverBase(gobject.GObject): __gsignals__ = { 'message_event' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_STRING,)), 'updated' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_FLOAT,)), 'finished': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()), 'errored': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()) } def __init__(self): gobject.GObject.__init__(self) gobject.type_register(ObserverBase) class Observer(ObserverBase): def __init__(self, queue): ObserverBase.__init__(self) self.queue = queue def start(self): # Create new thread to do the listening self.thread = threading.Thread(target=self.listen, args=()) self.thread.start() def join(self): self.thread.join() def listen(self): while True: # Listen for results on the queue and process them accordingly print "Observer is waiting for data" data = self.queue.get() print "Observer received: ",data dataType = type(data).__name__ if dataType=="bool" and data==True: self.emit('finished') return if dataType=="bool" and data==False: self.emit('errored') return elif dataType=="str": self.emit('message_event', data) elif dataType=="float": self.emit('updated', data) else: raise RuntimeError("Queue had an unknown data type ('"+dataType+"') with value '"+str(data)+"'") gobject.type_register(Observer) class Process(gobject.GObject): __gsignals__ = { 'starting': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()), 'joined': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()) } def __init__(self, function, arguments): gobject.GObject.__init__(self) # Create queue self.queue = multiprocessing.Queue() # Create process if arguments==(): arguments=(self.queue,) self.process = multiprocessing.Process(target=function, args=arguments) # Create observer self.observer = Observer(self.queue) def getObserver(self): return self.observer def start(self): # Starting observer self.emit("starting") print "starting observer" self.observer.start() # Start process print "starting process" self.process.start() def join(self): # Clean up after process print "cleaning up after process" self.process.join() self.emit("joined") # Clean up after observer print "cleaning up after observer" self.queue.put(False) # If the false is ever read, it's an error self.observer.join() # Clean up queue #self.queue.close() gobject.type_register(Process) class CombinedObserver(ObserverBase): def __init__(self, observers, weights=None): ObserverBase.__init__(self) self.observers = observers numObservers = len(observers) if weights==None: self.weights = [1 for i in range(numObservers)] else: self.weights = weights sum_weights = sum(self.weights) self.weights = [float(w)/sum_weights for w in self.weights] print "weights = ",self.weights index = 0 for observer in self.observers: #observer.connect("message_event", onMessageReceived, index) observer.connect("updated", self.onUpdate, index) observer.connect("finished", self.onFinished, index) observer.connect("errored", self.onErrored, index) index = index+1 self.fractions = [0 for i in range(numObservers)] self.finished = [False for i in range(numObservers)] self.errored = [False for i in range(numObservers)] self.lock = threading.RLock() def onUpdate(self, widget, fraction, index): self.lock.acquire() self.fractions[index] = fraction * self.weights[index] self.emit("updated", sum(self.fractions)) self.lock.release() def onFinished(self, widget, index): self.lock.acquire() self.finished[index] = True if all(self.finished): self.emit("finished") self.lock.release() def onErrored(self, widget, index): self.lock.acquire() self.errored[index] = True self.emit("errored") self.lock.release() gobject.type_register(CombinedObserver) # A ProgressWorker is responsible for (a) getting the job done and (b) # reporting progress. Progress is reported via the queue. Fractions # (from 0 to 1) indicate the proportion of the job that is finished. # Strings may indicate what has been done or what is about to be done. # The boolean 'True' is placed on the queue when the job is finished. class ProgressWorker(): def getResult(self): return None def setQueue(self, queue): self.queue = queue def go(self): raise RuntimeError("Worker.go() has not been overridden") class WorkerExample(ProgressWorker): def __init__(self): #Worker.__init__(self) manager = multiprocessing.Manager() self.result = manager.list() def getResult(self): return self.result def go(self, queue): self.setQueue(queue) print "The worker has started doing some work (counting from 0 to 9)" for i in range(10): self.queue.put("working on step "+str(i)+"...") proportion = (float(i)+1)/10 slp = random.uniform(0.50, 3.50) time.sleep(slp) self.queue.put(proportion) self.queue.put("done.") self.queue.put(1.0) self.queue.put(True) print "The worker has finished." self.result.append("The the work has been finished and the result isn't 42.") class ProgressBar(gobject.GObject): __gsignals__ = { 'updated' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_FLOAT,)), 'finished': (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()) } def __init__(self, observer): gobject.GObject.__init__(self) self.observer = observer self.observer.connect("updated",self.displayUpdate) self.observer.connect("message_event",self.displayMessage) self.observer.connect("finished",self.finish) self.progress = gtk.ProgressBar() self.progress.connect("destroy", self.destroy) self.process=None self.lock = threading.RLock() def displayUpdate(self, obj, fraction): self.lock.acquire() self.progress.set_fraction(fraction) self.emit("updated", fraction) self.lock.release() def displayMessage(self, obj, text): self.lock.acquire() self.progress.set_text(text) self.lock.release() def finish(self, obj, data=None): self.emit("finished") def destroy(self, widget, data=None): print "Destroying" if self.process==None: return self.queue.put("finished") self.process.join() self.thread.join() gobject.type_register(ProgressBar) class ProgressBarWindow: def __init__(self, observer, title): self.progressBar = ProgressBar(observer) self.progressBar.connect("finished", self.done) window = gtk.Window(gtk.WINDOW_TOPLEVEL) window.set_border_width(10) window.set_title(title) window.add(self.progressBar.progress) window.show_all() window.connect("destroy", self.destroy) def main(self): gtk.main() def done(self, widget, data=None): #self.destroy() print "FINISHED!" def destroy(self, widget=None, data=None): gtk.main_quit() # Test if __name__ == '__main__': # n workers worker = [] process = [] observer =[] window = [] for i in range(10): worker.append(WorkerExample()) process.append(Process(worker[i].go, ())) observer.append(Process[i].getObserver()) combo = CombinedObserver(observer) window0 = ProgressBarWindow(combo, "Workers' progress") for i in range(10): window.append(ProgressBarWindow(observer[i], "Workers {}'s progress".format(str(i)))) for i in range(10): process[i].start() gtk.main() print "finished" sys.exit()