Files
h2inc-old/Multiprocessing test/mp_test_4.py
2018-05-18 14:04:54 +02:00

301 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python2
import sys #exit
import gobject
import pygtk
pygtk.require('2.0')
import gtk
import multiprocessing
import threading
import time
import random
gtk.gdk.threads_init()
# Any time you connect to any of these signals, they should be
# protected with a threading.Lock or similar.
class ObserverBase(gobject.GObject):
__gsignals__ = {
'message_event' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_STRING,)),
'updated' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT,)),
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
()),
'errored': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self):
gobject.GObject.__init__(self)
gobject.type_register(ObserverBase)
class Observer(ObserverBase):
def __init__(self, queue):
ObserverBase.__init__(self)
self.queue = queue
def start(self):
# Create new thread to do the listening
self.thread = threading.Thread(target=self.listen, args=())
self.thread.start()
def join(self):
self.thread.join()
def listen(self):
while True:
# Listen for results on the queue and process them accordingly
print "Observer is waiting for data"
data = self.queue.get()
print "Observer received: ",data
dataType = type(data).__name__
if dataType=="bool" and data==True:
self.emit('finished')
return
if dataType=="bool" and data==False:
self.emit('errored')
return
elif dataType=="str":
self.emit('message_event', data)
elif dataType=="float":
self.emit('updated', data)
else:
raise RuntimeError("Queue had an unknown data type ('"+dataType+"') with value '"+str(data)+"'")
gobject.type_register(Observer)
class Process(gobject.GObject):
__gsignals__ = {
'starting': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
()),
'joined': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, function, arguments):
gobject.GObject.__init__(self)
# Create queue
self.queue = multiprocessing.Queue()
# Create process
if arguments==(): arguments=(self.queue,)
self.process = multiprocessing.Process(target=function, args=arguments)
# Create observer
self.observer = Observer(self.queue)
def getObserver(self):
return self.observer
def start(self):
# Starting observer
self.emit("starting")
print "starting observer"
self.observer.start()
# Start process
print "starting process"
self.process.start()
def join(self):
# Clean up after process
print "cleaning up after process"
self.process.join()
self.emit("joined")
# Clean up after observer
print "cleaning up after observer"
self.queue.put(False) # If the false is ever read, it's an error
self.observer.join()
# Clean up queue
#self.queue.close()
gobject.type_register(Process)
class CombinedObserver(ObserverBase):
def __init__(self, observers, weights=None):
ObserverBase.__init__(self)
self.observers = observers
numObservers = len(observers)
if weights==None:
self.weights = [1 for i in range(numObservers)]
else:
self.weights = weights
sum_weights = sum(self.weights)
self.weights = [float(w)/sum_weights for w in self.weights]
print "weights = ",self.weights
index = 0
for observer in self.observers:
#observer.connect("message_event", onMessageReceived, index)
observer.connect("updated", self.onUpdate, index)
observer.connect("finished", self.onFinished, index)
observer.connect("errored", self.onErrored, index)
index = index+1
self.fractions = [0 for i in range(numObservers)]
self.finished = [False for i in range(numObservers)]
self.errored = [False for i in range(numObservers)]
self.lock = threading.RLock()
def onUpdate(self, widget, fraction, index):
self.lock.acquire()
self.fractions[index] = fraction * self.weights[index]
self.emit("updated", sum(self.fractions))
self.lock.release()
def onFinished(self, widget, index):
self.lock.acquire()
self.finished[index] = True
if all(self.finished):
self.emit("finished")
self.lock.release()
def onErrored(self, widget, index):
self.lock.acquire()
self.errored[index] = True
self.emit("errored")
self.lock.release()
gobject.type_register(CombinedObserver)
# A ProgressWorker is responsible for (a) getting the job done and (b)
# reporting progress. Progress is reported via the queue. Fractions
# (from 0 to 1) indicate the proportion of the job that is finished.
# Strings may indicate what has been done or what is about to be done.
# The boolean 'True' is placed on the queue when the job is finished.
class ProgressWorker():
def getResult(self):
return None
def setQueue(self, queue):
self.queue = queue
def go(self):
raise RuntimeError("Worker.go() has not been overridden")
class WorkerExample(ProgressWorker):
def __init__(self):
#Worker.__init__(self)
manager = multiprocessing.Manager()
self.result = manager.list()
def getResult(self):
return self.result
def go(self, queue):
self.setQueue(queue)
print "The worker has started doing some work (counting from 0 to 9)"
for i in range(10):
self.queue.put("working on step "+str(i)+"...")
proportion = (float(i)+1)/10
slp = random.uniform(0.50, 3.50)
time.sleep(slp)
self.queue.put(proportion)
self.queue.put("done.")
self.queue.put(1.0)
self.queue.put(True)
print "The worker has finished."
self.result.append("The the work has been finished and the result isn't 42.")
class ProgressBar(gobject.GObject):
__gsignals__ = {
'updated' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT,)),
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, observer):
gobject.GObject.__init__(self)
self.observer = observer
self.observer.connect("updated",self.displayUpdate)
self.observer.connect("message_event",self.displayMessage)
self.observer.connect("finished",self.finish)
self.progress = gtk.ProgressBar()
self.progress.connect("destroy", self.destroy)
self.process=None
self.lock = threading.RLock()
def displayUpdate(self, obj, fraction):
self.lock.acquire()
self.progress.set_fraction(fraction)
self.emit("updated", fraction)
self.lock.release()
def displayMessage(self, obj, text):
self.lock.acquire()
self.progress.set_text(text)
self.lock.release()
def finish(self, obj, data=None):
self.emit("finished")
def destroy(self, widget, data=None):
print "Destroying"
if self.process==None:
return
self.queue.put("finished")
self.process.join()
self.thread.join()
gobject.type_register(ProgressBar)
class ProgressBarWindow:
def __init__(self, observer, title):
self.progressBar = ProgressBar(observer)
self.progressBar.connect("finished", self.done)
window = gtk.Window(gtk.WINDOW_TOPLEVEL)
window.set_border_width(10)
window.set_title(title)
window.add(self.progressBar.progress)
window.show_all()
window.connect("destroy", self.destroy)
def main(self):
gtk.main()
def done(self, widget, data=None):
#self.destroy()
print "FINISHED!"
def destroy(self, widget=None, data=None):
gtk.main_quit()
# Test
if __name__ == '__main__':
# n workers
worker = []
process = []
observer =[]
window = []
for i in range(10):
worker.append(WorkerExample())
process.append(Process(worker[i].go, ()))
observer.append(Process[i].getObserver())
combo = CombinedObserver(observer)
window0 = ProgressBarWindow(combo, "Workers' progress")
for i in range(10):
window.append(ProgressBarWindow(observer[i], "Workers {}'s progress".format(str(i))))
for i in range(10):
process[i].start()
gtk.main()
print "finished"
sys.exit()