use of Queue

This is a discussion on use of Queue within the Python forums in Programming Languages category; how is Queue intended to be used? I found the following code in python manual, but I don't understand how to stop consumers after all items have been produced. I tried different approaches but all of them seemed incorrect (race, deadlock or duplicating queue functionality) def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.setDaemon(True) t.start() for item in source(): q.put(item) q.join() # block until all tasks are done...

Go Back   Application Development Forum > Programming Languages > Python

Object Mix

Register FAQ Calendar Search Today's Posts Mark Forums Read
  #1  
Old 08-27-2008, 05:45 AM
Alexandru Mosoi
Guest
 
Default use of Queue

how is Queue intended to be used? I found the following code in python
manual, but I don't understand how to stop consumers after all items
have been produced. I tried different approaches but all of them
seemed incorrect (race, deadlock or duplicating queue functionality)


def worker():
while True:
item = q.get()
do_work(item)
q.task_done()

q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.setDaemon(True)
t.start()

for item in source():
q.put(item)

q.join() # block until all tasks are done
Reply With Quote
  #2  
Old 08-27-2008, 05:56 AM
Diez B. Roggisch
Guest
 
Default Re: use of Queue

Alexandru Mosoi wrote:

> how is Queue intended to be used? I found the following code in python
> manual, but I don't understand how to stop consumers after all items
> have been produced. I tried different approaches but all of them
> seemed incorrect (race, deadlock or duplicating queue functionality)
>
>
> def worker():
> while True:
> item = q.get()
> do_work(item)
> q.task_done()
>
> q = Queue()
> for i in range(num_worker_threads):
> t = Thread(target=worker)
> t.setDaemon(True)
> t.start()
>
> for item in source():
> q.put(item)
>
> q.join() # block until all tasks are done


Put a sentinel into the queue that gets interpreted as "terminate" for the
workers. You need of course to put it in there once for each worker.

Diez
Reply With Quote
  #3  
Old 08-27-2008, 06:06 AM
Gerhard Häring
Guest
 
Default Re: use of Queue

Alexandru Mosoi wrote:
> how is Queue intended to be used? I found the following code in python
> manual, but I don't understand how to stop consumers after all items
> have been produced. I tried different approaches but all of them
> seemed incorrect (race, deadlock or duplicating queue functionality)
>
>
> def worker():
> while True:
> item = q.get()


if item is None:
break

> do_work(item)
> q.task_done()
>
> q = Queue()
> for i in range(num_worker_threads):
> t = Thread(target=worker)
> t.setDaemon(True)
> t.start()
>
> for item in source():
> q.put(item)


# stop all consumers
for i in range(num_worker_threads):
q.put(None)

>
> q.join() # block until all tasks are done


This is how I do it.

-- Gerhard

Reply With Quote
  #4  
Old 08-27-2008, 06:11 AM
skip@pobox.com
Guest
 
Default Re: use of Queue


Diez> Put a sentinel into the queue that gets interpreted as "terminate"
Diez> for the workers. You need of course to put it in there once for
Diez> each worker.

Or make the consumers daemon threads so that when the producers are finished
an all non-daemon threads exit, the consumers do as well.

Skip
Reply With Quote
  #5  
Old 08-27-2008, 06:41 AM
skip@pobox.com
Guest
 
Default Re: use of Queue


skip> Or make the consumers daemon threads so that when the producers
skip> are finished an all non-daemon threads exit, the consumers do as
skip> well.

Forget that I wrote this. If they happen to be working on the token they've
consumed at the time the other threads exit, they will as well. Use the
sentinel token idea instead.

Skip
Reply With Quote
  #6  
Old 08-27-2008, 07:49 AM
Alexandru Mosoi
Guest
 
Default Re: use of Queue

On Aug 27, 1:06*pm, Gerhard Häring <g...@ghaering.de> wrote:
> Alexandru Mosoi wrote:
> > how is Queue intended to be used? I found the following code in python
> > manual, but I don't understand how to stop consumers after all items
> > have been produced. I tried different approaches but all of them
> > seemed incorrect (race, deadlock or duplicating queue functionality)

>
> > * * def worker():
> > * * * * while True:
> > * * * * * * item = q.get()

>
> * * * * * * * *if item is None:
> * * * * * * * * * *break
>
> > * * * * * * do_work(item)
> > * * * * * * q.task_done()

>
> > * * q = Queue()
> > * * for i in range(num_worker_threads):
> > * * * * *t = Thread(target=worker)
> > * * * * *t.setDaemon(True)
> > * * * * *t.start()

>
> > * * for item in source():
> > * * * * q.put(item)

>
> # stop all consumers
> for i in range(num_worker_threads):
> * * *q.put(None)
>
>
>
> > * * q.join() * * * # block until all tasks are done

>
> This is how I do it.
>
> -- Gerhard



Your solution works assuming that you know how many consumer threads
you have . I don't . More than that, it's not correct if you have
more than one producer . Having a sentinel was my very first idea,
but as you see... it's a race condition (there are cases in which not
all items are processed).
Reply With Quote
  #7  
Old 08-27-2008, 07:54 AM
Jeff
Guest
 
Default Re: use of Queue

> Your solution works assuming that you know how many consumer threads
> you have . I don't . More than that, it's not correct if you have
> more than one producer . Having a sentinel was my very first idea,
> but as you see... it's a race condition (there are cases in which not
> all items are processed).


Queue raises an Empty exception when there are no items left in the
queue. Put the q.get() call in a try block and exit in the except
block.

You can also use a condition variable to signal threads to terminate.
Reply With Quote
  #8  
Old 08-27-2008, 07:58 AM
Alexandru Mosoi
Guest
 
Default Re: use of Queue

On Aug 27, 2:54*pm, Jeff <jeffo...@gmail.com> wrote:

> Queue raises an Empty exception when there are no items left in the
> queue. *Put the q.get() call in a try block and exit in the except
> block.

Wrong. What if producer takes a long time to produce an item?
Consumers
will find the queue empty and exit instead of waiting.

> You can also use a condition variable to signal threads to terminate.

This is the solution I want to avoid because it duplicates Queue's
functionality.
I prefer having a clean solution with nice design to hacking Queue
class.
Reply With Quote
  #9  
Old 08-27-2008, 08:17 AM
Alexandru Mosoi
Guest
 
Default Re: use of Queue

On Aug 27, 12:45*pm, Alexandru Mosoi <brtz...@gmail.com> wrote:
> how is Queue intended to be used? I found the following code in python
> manual, but I don't understand how to stop consumers after all items
> have been produced. I tried different approaches but all of them
> seemed incorrect (race, deadlock or duplicating queue functionality)
>
> * * def worker():
> * * * * while True:
> * * * * * * item = q.get()
> * * * * * * do_work(item)
> * * * * * * q.task_done()
>
> * * q = Queue()
> * * for i in range(num_worker_threads):
> * * * * *t = Thread(target=worker)
> * * * * *t.setDaemon(True)
> * * * * *t.start()
>
> * * for item in source():
> * * * * q.put(item)
>
> * * q.join() * * * # block until all tasks are done



ok. I think I figured it out . let me know what you think

global num_tasks, num_done, queue
num_tasks = 0
num_done = 0
queue = Queue()

# producer
num_tasks += 1
for i in items:
num_tasks += 1
queue.put(i)

num_tasks -= 1
if num_tasks == num_done:
queue.put(None)

# consumer
while True:
i = queue.get()
if i is None:
queue.put(None)
break

# do stuff

num_done += 1
if num_done == num_tasks:
queue.put(None)
break





Reply With Quote
  #10  
Old 08-27-2008, 08:40 AM
Diez B. Roggisch
Guest
 
Default Re: use of Queue

>
> Your solution works assuming that you know how many consumer threads
> you have . I don't . More than that, it's not correct if you have
> more than one producer . Having a sentinel was my very first idea,
> but as you see... it's a race condition (there are cases in which not
> all items are processed).


If you have several producers, how do you coordinate when to shut down?

Apart from that, you can easily solve the problem of not knowing how many
consumers you have by making a consumer stuff back the sentinel into the
queue. Then it will ripple down until no consumer is left.

Diez
Reply With Quote
Reply


Thread Tools
Display Modes


All times are GMT -5. The time now is 04:54 PM.


Powered by vBulletin® Version 3.7.2
Copyright ©2000 - 2008, Jelsoft Enterprises Ltd.
Search Engine Optimization by vBSEO 3.2.0
vB Ad Management by =RedTyger=

In an effort to better serve ads to our visitors, cookies are used on objectmix.com. For more information, check out our Privacy Policy.