diff --git a/Multiprocessing test/ticker_gtk.py b/Multiprocessing test/ticker_gtk.py index a54dcfa..76025b7 100755 --- a/Multiprocessing test/ticker_gtk.py +++ b/Multiprocessing test/ticker_gtk.py @@ -25,22 +25,18 @@ gobject.threads_init() class Listener(gobject.GObject): __gsignals__ = { - 'ticker_update' : (gobject.SIGNAL_RUN_LAST, - gobject.TYPE_NONE, - (gobject.TYPE_FLOAT, - gobject.TYPE_FLOAT, - gobject.TYPE_FLOAT)), - 'ticker_finished' : (gobject.SIGNAL_RUN_LAST, + 'handlers_update' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, - ()), + (gobject.TYPE_FLOAT, + gobject.TYPE_FLOAT, + gobject.TYPE_FLOAT)), 'handlers_finished' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()), 'workers_update' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, - (gobject.TYPE_FLOAT, - gobject.TYPE_FLOAT, - gobject.TYPE_FLOAT)), + (gobject.TYPE_STRING, + gobject.TYPE_PYOBJECT)), 'workers_finished' : (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, ()) @@ -61,13 +57,13 @@ class Listener(gobject.GObject): print("Ticker is finishing.") self.emit("ticker_finished", data[0]) return - elif data[0]=="ticker_update": - print("Ticker is working.") - self.emit("update_ticker") + elif data[0]=="handler_update": + print("Handlers are working.") + self.emit("handlers_update") return elif data[0]=="work_update": print("Workers are working.") - self.emit("update_workers", data[0], data[1], data[2]) + self.emit("workers_update", data[1]) return else: self.emit('update_ticker', data[0], data[1], data[2]) @@ -75,25 +71,36 @@ class Listener(gobject.GObject): gobject.type_register(Listener) class Worker(): - def __init__(self, sq, wq, i): + def __init__(self, l, sq, wq, i): + self.lock = l self.signal_queue = sq self.work_queue = wq self.iterations = i + self.fractions = [] def go(self): while True: + print("Working") task = self.work_queue.get(2) - lock = multiprocessing.Lock().acquire() - tasks = self.work_queue.qsize() - lock.release() + self.fractions.append(task) + print(task) + self.lock.acquire() + try: + tasks = self.work_queue.qsize() + self.fractions.append(tasks) + print(tasks) + finally: + self.lock.release() with self.iterations.get_lock(): if task is None: break self.iterations.value += 1 - tasks = tasks/1000 + self.fractions.append(self.iterations.value) + print(self.iterations.value) + #tasks = tasks/1000 time.sleep(0.001) self.work_queue.task_done() - self.signal_queue.put("work_update", tasks, self.iterations) + self.signal_queue.put("work_update", self.fractions) self.work_queue.task_done() self.signal_queue.put("work_finished") @@ -105,11 +112,15 @@ class Handler(): self.iterations = i def go(self): - while self.ticker_queue.empty() is not True: - ticker = self.ticker_queue.get(0.1) - self.work_queue.put(ticker-1000) - time.sleep(0.01) - self.ticker_queue.task_done() + while True: + if self.ticker_queue.empty() is not True: + ticker = self.ticker_queue.get(0.1) + self.work_queue.put(ticker-1000) + time.sleep(0.01) + self.ticker_queue.task_done() + self.ticker_queue.task_done() + self.signal_queue.put("") + #def looprun(n): #for i in tickers: @@ -156,7 +167,7 @@ class ticker: self.work_current_count = 0 self.work_processed_count = 0 self.ticker_num = globvar.ticker_num - self.fraction = 0 + self.fraction = 1/1000 self.process = None self.signal_queue = multiprocessing.JoinableQueue() self.ticker_queue = multiprocessing.JoinableQueue() @@ -166,6 +177,7 @@ class ticker: self.work_num = 0 self.processes = [] self.threads = [] + self.lock = multiprocessing.Lock() self.app = Gtk.Application.new("org.ticker", Gio.ApplicationFlags(0)) self.app.connect("activate", self.on_app_activate) self.app.connect("shutdown", self.on_app_shutdown) @@ -180,7 +192,7 @@ class ticker: self.obj = builder.get_object self.obj("window").set_application(app) self.obj("window").set_wmclass("ticker","ticker") - self.obj("window").set_title("ticker - v.0.0.1") + self.obj("window").set_title("ticker - v.0.1.15") self.obj("window").show_all() self.obj("ticker_put_count_label").set_text(str(self.ticker_put_count)) self.obj("ticker_items_count_label").set_text(str(self.ticker_current_count)) @@ -206,15 +218,15 @@ class ticker: self.obj("ticker_processed_count_label").set_text(index3) self.obj("ticker_processed_progressbar").set_fraction(self.val_3) - def update_work(self, obj, index1, index2, index3, data=None): - self.val_1 = index1*self.fraction - self.val_2 = index2*self.fraction - self.val_3 = index3*self.fraction - self.obj("work_put_count_label").set_text(index1) + def update_work(self, obj, index): + self.val_1 = index[0]*self.fraction + self.val_2 = index[1]*self.fraction + self.val_3 = index[2]*self.fraction + self.obj("work_put_count_label").set_text(index[0]) self.obj("work_put_progressbar").set_fraction(self.val_1) - self.obj("work_items_count_label").set_text(index2) + self.obj("work_items_count_label").set_text(index[1]) self.obj("work_items_progressbar").set_fraction(self.val_2) - self.obj("work_processed_count_label").set_text(index3) + self.obj("work_processed_count_label").set_text(index[2]) self.obj("work_processed_progressbar").set_fraction(self.val_3) def ticker_Finished(self, obj, data=None): @@ -251,8 +263,8 @@ class ticker: def on_execute_clicked(self,widget): print("Creating Listener") listener = Listener(self.signal_queue) - listener.connect("ticker_update",self.update_ticker) - listener.connect("ticker_finished",self.ticker_Finished) + listener.connect("handlers_update",self.update_ticker) + listener.connect("handlers_finished",self.ticker_Finished) listener.connect("workers_update",self.update_work) listener.connect("workers_finished",self.workers_Finished) @@ -260,11 +272,21 @@ class ticker: thread = threading.Thread(target=listener.go, args=()) thread.start() + print("Creating ticker queue") for i in self.tickers: self.ticker_queue.put(i) + print(self.ticker_queue.qsize()) + + for i in range(20): + h = Handler(self.signal_queue, self.ticker_queue, self.work_queue, self.iterations) + t = threading.Thread(target = h.go, args = ()) + t.deamon = True + t.start() + self.threads.append(t) for i in range(8): - p = multiprocessing.Process(target = Worker.go, args = (self.work_queue, self.iterations, )) + w = Worker(self.lock, self.signal_queue, self.work_queue, self.iterations) + p = multiprocessing.Process(target = w.go, args=()) p.deamon = True p.start() self.processes.append(p) diff --git a/Multiprocessing test/ticker_new.py b/Multiprocessing test/ticker_new.py index debf47e..c0851cf 100644 --- a/Multiprocessing test/ticker_new.py +++ b/Multiprocessing test/ticker_new.py @@ -59,7 +59,7 @@ def looprun(n): return if __name__=='__main__: - + for n in range(100): looprun(n+1)