| Register | FAQ | Calendar | Search | Today's Posts | Mark Forums Read |
|
#1
| |||
| |||
| how is Queue intended to be used? I found the following code in python manual, but I don't understand how to stop consumers after all items have been produced. I tried different approaches but all of them seemed incorrect (race, deadlock or duplicating queue functionality) def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.setDaemon(True) t.start() for item in source(): q.put(item) q.join() # block until all tasks are done |
|
#2
| |||
| |||
| Alexandru Mosoi wrote: > how is Queue intended to be used? I found the following code in python > manual, but I don't understand how to stop consumers after all items > have been produced. I tried different approaches but all of them > seemed incorrect (race, deadlock or duplicating queue functionality) > > > def worker(): > while True: > item = q.get() > do_work(item) > q.task_done() > > q = Queue() > for i in range(num_worker_threads): > t = Thread(target=worker) > t.setDaemon(True) > t.start() > > for item in source(): > q.put(item) > > q.join() # block until all tasks are done Put a sentinel into the queue that gets interpreted as "terminate" for the workers. You need of course to put it in there once for each worker. Diez |
|
#3
| |||
| |||
| Alexandru Mosoi wrote: > how is Queue intended to be used? I found the following code in python > manual, but I don't understand how to stop consumers after all items > have been produced. I tried different approaches but all of them > seemed incorrect (race, deadlock or duplicating queue functionality) > > > def worker(): > while True: > item = q.get() if item is None: break > do_work(item) > q.task_done() > > q = Queue() > for i in range(num_worker_threads): > t = Thread(target=worker) > t.setDaemon(True) > t.start() > > for item in source(): > q.put(item) # stop all consumers for i in range(num_worker_threads): q.put(None) > > q.join() # block until all tasks are done This is how I do it. -- Gerhard |
|
#4
| |||
| |||
| Diez> Put a sentinel into the queue that gets interpreted as "terminate" Diez> for the workers. You need of course to put it in there once for Diez> each worker. Or make the consumers daemon threads so that when the producers are finished an all non-daemon threads exit, the consumers do as well. Skip |
|
#5
| |||
| |||
| skip> Or make the consumers daemon threads so that when the producers skip> are finished an all non-daemon threads exit, the consumers do as skip> well. Forget that I wrote this. If they happen to be working on the token they've consumed at the time the other threads exit, they will as well. Use the sentinel token idea instead. Skip |
|
#6
| |||
| |||
| On Aug 27, 1:06*pm, Gerhard Häring <g...@ghaering.de> wrote: > Alexandru Mosoi wrote: > > how is Queue intended to be used? I found the following code in python > > manual, but I don't understand how to stop consumers after all items > > have been produced. I tried different approaches but all of them > > seemed incorrect (race, deadlock or duplicating queue functionality) > > > * * def worker(): > > * * * * while True: > > * * * * * * item = q.get() > > * * * * * * * *if item is None: > * * * * * * * * * *break > > > * * * * * * do_work(item) > > * * * * * * q.task_done() > > > * * q = Queue() > > * * for i in range(num_worker_threads): > > * * * * *t = Thread(target=worker) > > * * * * *t.setDaemon(True) > > * * * * *t.start() > > > * * for item in source(): > > * * * * q.put(item) > > # stop all consumers > for i in range(num_worker_threads): > * * *q.put(None) > > > > > * * q.join() * * * # block until all tasks are done > > This is how I do it. > > -- Gerhard Your solution works assuming that you know how many consumer threads you have . I don't . More than that, it's not correct if you havemore than one producer . Having a sentinel was my very first idea,but as you see... it's a race condition (there are cases in which not all items are processed). |
|
#7
| |||
| |||
| > Your solution works assuming that you know how many consumer threads > you have . I don't . More than that, it's not correct if you have> more than one producer . Having a sentinel was my very first idea,> but as you see... it's a race condition (there are cases in which not > all items are processed). Queue raises an Empty exception when there are no items left in the queue. Put the q.get() call in a try block and exit in the except block. You can also use a condition variable to signal threads to terminate. |
|
#8
| |||
| |||
| On Aug 27, 2:54*pm, Jeff <jeffo...@gmail.com> wrote: > Queue raises an Empty exception when there are no items left in the > queue. *Put the q.get() call in a try block and exit in the except > block. Wrong. What if producer takes a long time to produce an item? Consumers will find the queue empty and exit instead of waiting. > You can also use a condition variable to signal threads to terminate. This is the solution I want to avoid because it duplicates Queue's functionality. I prefer having a clean solution with nice design to hacking Queue class. |
|
#9
| |||
| |||
| On Aug 27, 12:45*pm, Alexandru Mosoi <brtz...@gmail.com> wrote: > how is Queue intended to be used? I found the following code in python > manual, but I don't understand how to stop consumers after all items > have been produced. I tried different approaches but all of them > seemed incorrect (race, deadlock or duplicating queue functionality) > > * * def worker(): > * * * * while True: > * * * * * * item = q.get() > * * * * * * do_work(item) > * * * * * * q.task_done() > > * * q = Queue() > * * for i in range(num_worker_threads): > * * * * *t = Thread(target=worker) > * * * * *t.setDaemon(True) > * * * * *t.start() > > * * for item in source(): > * * * * q.put(item) > > * * q.join() * * * # block until all tasks are done ok. I think I figured it out . let me know what you thinkglobal num_tasks, num_done, queue num_tasks = 0 num_done = 0 queue = Queue() # producer num_tasks += 1 for i in items: num_tasks += 1 queue.put(i) num_tasks -= 1 if num_tasks == num_done: queue.put(None) # consumer while True: i = queue.get() if i is None: queue.put(None) break # do stuff num_done += 1 if num_done == num_tasks: queue.put(None) break |
|
#10
| |||
| |||
| > > Your solution works assuming that you know how many consumer threads > you have . I don't . More than that, it's not correct if you have> more than one producer . Having a sentinel was my very first idea,> but as you see... it's a race condition (there are cases in which not > all items are processed). If you have several producers, how do you coordinate when to shut down? Apart from that, you can easily solve the problem of not knowing how many consumers you have by making a consumer stuff back the sentinel into the queue. Then it will ripple down until no consumer is left. Diez |
![]() |
| Thread Tools | |
| Display Modes | |
In an effort to better serve ads to our visitors, cookies are used on objectmix.com. For more information, check out our Privacy Policy.