Files
h2inc-old/Multiprocessing test/mp_test_3.py
2018-03-04 09:45:35 +01:00

353 lines
11 KiB
Python

#!/usr/bin/env python
import sys #exit
import gobject
import pygtk
pygtk.require('2.0')
import gtk
import multiprocessing
import threading
import time
import random
gtk.gdk.threads_init()
# Any time you connect to any of these signals, they should be
# protected with a threading.Lock or similar.
class ObserverBase(gobject.GObject):
__gsignals__ = {
'message_event' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_STRING,)),
'updated' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT,)),
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
()),
'errored': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self):
gobject.GObject.__init__(self)
gobject.type_register(ObserverBase)
class Observer(ObserverBase):
def __init__(self, queue):
ObserverBase.__init__(self)
self.queue = queue
def start(self):
# Create new thread to do the listening
self.thread = threading.Thread(target=self.listen, args=())
self.thread.start()
def join(self):
self.thread.join()
def listen(self):
while True:
# Listen for results on the queue and process them accordingly
print "Observer is waiting for data"
data = self.queue.get()
print "Observer received: ",data
dataType = type(data).__name__
if dataType=="bool" and data==True:
self.emit('finished')
return
if dataType=="bool" and data==False:
self.emit('errored')
return
elif dataType=="str":
self.emit('message_event', data)
elif dataType=="float":
self.emit('updated', data)
else:
raise RuntimeError("Queue had an unknown data type ('"+dataType+"') with value '"+str(data)+"'")
gobject.type_register(Observer)
class Process(gobject.GObject):
__gsignals__ = {
'starting': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
()),
'joined': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, function, arguments):
gobject.GObject.__init__(self)
# Create queue
self.queue = multiprocessing.Queue()
# Create process
if arguments==(): arguments=(self.queue,)
self.process = multiprocessing.Process(target=function, args=arguments)
# Create observer
self.observer = Observer(self.queue)
def getObserver(self):
return self.observer
def start(self):
# Starting observer
self.emit("starting")
print "starting observer"
self.observer.start()
# Start process
print "starting process"
self.process.start()
def join(self):
# Clean up after process
print "cleaning up after process"
self.process.join()
self.emit("joined")
# Clean up after observer
print "cleaning up after observer"
self.queue.put(False) # If the false is ever read, it's an error
self.observer.join()
# Clean up queue
#self.queue.close()
gobject.type_register(Process)
class CombinedObserver(ObserverBase):
def __init__(self, observers, weights=None):
ObserverBase.__init__(self)
self.observers = observers
numObservers = len(observers)
if weights==None:
self.weights = [1 for i in range(numObservers)]
else:
self.weights = weights
sum_weights = sum(self.weights)
self.weights = [float(w)/sum_weights for w in self.weights]
print "weights = ",self.weights
index = 0
for observer in self.observers:
#observer.connect("message_event", onMessageReceived, index)
observer.connect("updated", self.onUpdate, index)
observer.connect("finished", self.onFinished, index)
observer.connect("errored", self.onErrored, index)
index = index+1
self.fractions = [0 for i in range(numObservers)]
self.finished = [False for i in range(numObservers)]
self.errored = [False for i in range(numObservers)]
self.lock = threading.RLock()
def onUpdate(self, widget, fraction, index):
self.lock.acquire()
self.fractions[index] = fraction * self.weights[index]
self.emit("updated", sum(self.fractions))
self.lock.release()
def onFinished(self, widget, index):
self.lock.acquire()
self.finished[index] = True
if all(self.finished):
self.emit("finished")
self.lock.release()
def onErrored(self, widget, index):
self.lock.acquire()
self.errored[index] = True
self.emit("errored")
self.lock.release()
gobject.type_register(CombinedObserver)
# A ProgressWorker is responsible for (a) getting the job done and (b)
# reporting progress. Progress is reported via the queue. Fractions
# (from 0 to 1) indicate the proportion of the job that is finished.
# Strings may indicate what has been done or what is about to be done.
# The boolean 'True' is placed on the queue when the job is finished.
class ProgressWorker():
def getResult(self):
return None
def setQueue(self, queue):
self.queue = queue
def go(self):
raise RuntimeError("Worker.go() has not been overridden")
class WorkerExample(ProgressWorker):
def __init__(self):
#Worker.__init__(self)
manager = multiprocessing.Manager()
self.result = manager.list()
def getResult(self):
return self.result
def go(self, queue):
self.setQueue(queue)
print "The worker has started doing some work (counting from 0 to 9)"
for i in range(10):
self.queue.put("working on step "+str(i)+"...")
proportion = (float(i)+1)/10
slp = random.uniform(0.50, 3.50)
time.sleep(slp)
self.queue.put(proportion)
self.queue.put("done.")
self.queue.put(1.0)
self.queue.put(True)
print "The worker has finished."
self.result.append("The the work has been finished and the result isn't 42.")
class ProgressBar(gobject.GObject):
__gsignals__ = {
'updated' : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(gobject.TYPE_FLOAT,)),
'finished': (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
())
}
def __init__(self, observer):
gobject.GObject.__init__(self)
self.observer = observer
self.observer.connect("updated",self.displayUpdate)
self.observer.connect("message_event",self.displayMessage)
self.observer.connect("finished",self.finish)
self.progress = gtk.ProgressBar()
self.progress.connect("destroy", self.destroy)
self.process=None
self.lock = threading.RLock()
def displayUpdate(self, obj, fraction):
self.lock.acquire()
self.progress.set_fraction(fraction)
self.emit("updated", fraction)
self.lock.release()
def displayMessage(self, obj, text):
self.lock.acquire()
self.progress.set_text(text)
self.lock.release()
def finish(self, obj, data=None):
self.emit("finished")
def destroy(self, widget, data=None):
print "Destroying"
if self.process==None:
return
self.queue.put("finished")
self.process.join()
self.thread.join()
gobject.type_register(ProgressBar)
class ProgressBarWindow:
def __init__(self, observer, title):
self.progressBar = ProgressBar(observer)
self.progressBar.connect("finished", self.done)
window = gtk.Window(gtk.WINDOW_TOPLEVEL)
window.set_border_width(10)
window.set_title(title)
window.add(self.progressBar.progress)
window.show_all()
window.connect("destroy", self.destroy)
def main(self):
gtk.main()
def done(self, widget, data=None):
#self.destroy()
print "FINISHED!"
def destroy(self, widget=None, data=None):
gtk.main_quit()
# Test
if __name__ == '__main__':
# Three workers
worker1 = WorkerExample()
process1 = Process(worker1.go, ())
observer1 = process1.getObserver()
worker2 = WorkerExample()
process2 = Process(worker2.go, ())
observer2 = process2.getObserver()
worker3 = WorkerExample()
process3 = Process(worker3.go, ())
observer3 = process3.getObserver()
combo = CombinedObserver([observer1, observer2, observer3])
window0 = ProgressBarWindow(combo, "Workers' progress")
window1 = ProgressBarWindow(observer1, "Workers One's progress")
window2 = ProgressBarWindow(observer2, "Workers Two's progress")
window3 = ProgressBarWindow(observer3, "Workers Three's progress")
process1.start()
#time.sleep(0.1)
process2.start()
#time.sleep(0.1)
process3.start()
gtk.main()
#process1.join()
#process2.join()
#process3.join()
print "finished"
sys.exit()
# Two workers
worker1 = WorkerExample()
process1 = Process(worker1.go, ())
observer1 = process1.getObserver()
worker2 = WorkerExample()
process2 = Process(worker2.go, ())
observer2 = process2.getObserver()
combo = CombinedObserver([observer1, observer2])
window0 = ProgressBarWindow(combo, "Workers' progress")
window1 = ProgressBarWindow(observer1, "Workers One's progress")
window2 = ProgressBarWindow(observer2, "Workers Two's progress")
process1.start()
time.sleep(0.1)
process2.start()
gtk.main()
process1.join()
process2.join()
print "finished"
sys.exit()
# One worker
print "creating worker1"
worker1 = WorkerExample() # The task to do, carefully constructed to give feedback via a queue
print "creating process1"
process1 = Process(worker1.go, ()) # Responsible for looking after the worker: supplying the queue, setting up the observer, starting process
observer1 = process1.getObserver() # An observer of the worker's progress that listens to the queue
window1 = ProgressBarWindow(observer1, "Workers One's progress") # A display of the progress, connected to the observer
process1.start()
gtk.main()
process1.join()
print "result = ", worker1.result
print "finished"
sys.exit()