import threading import multiprocessing import time import sys def network_worker(tq, wq, eq): ticker_queue = tq work_queue = wq while ticker_queue.empty() is not True: ticker = tq.get(0.1) print(threading.currentThread().getName(),"received ",ticker) work_queue.put(ticker-1000) time.sleep(0.01) ticker_queue.task_done() exit_queue.put(1) #work_queue.close() return def worker(wq, i): work_queue = wq #while work_queue.get(0.1) is not None: while work_queue.qsize(): print(work_queue.qsize()) #if work_queue.empty() == False: with i.get_lock(): i.value += 1 task = work_queue.get(10) print(multiprocessing.current_process(),"received",task) #simulate work time.sleep(0.001) work_queue.task_done() return ticker_queue = multiprocessing.JoinableQueue() work_queue = multiprocessing.JoinableQueue() exit_queue = multiprocessing.JoinableQueue() iterations = multiprocessing.Value('i', 0) tickers = range(1000) processes = [] for i in tickers: ticker_queue.put(i) for i in range(20): t = threading.Thread(target=network_worker, args = (ticker_queue, work_queue, exit_queue)) t.deamon = True print("Starting: ",t.name) t.start() for i in range(8): p = multiprocessing.Process(target = worker, args = (work_queue, iterations, )) p.deamon = True print("Starting: ",p.name) p.start() processes.append(p) ticker_queue.join() print(exit_queue,empty()) if exit_queue.empty() == False: work_queue.join() for p in processes: p.terminate() p.join() print("Total number of iterations:", iterations.value)