use of Queue

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • Alexandru  Mosoi

    use of Queue

    how is Queue intended to be used? I found the following code in python
    manual, but I don't understand how to stop consumers after all items
    have been produced. I tried different approaches but all of them
    seemed incorrect (race, deadlock or duplicating queue functionality)


    def worker():
    while True:
    item = q.get()
    do_work(item)
    q.task_done()

    q = Queue()
    for i in range(num_worke r_threads):
    t = Thread(target=w orker)
    t.setDaemon(Tru e)
    t.start()

    for item in source():
    q.put(item)

    q.join() # block until all tasks are done
  • Diez B. Roggisch

    #2
    Re: use of Queue

    Alexandru Mosoi wrote:
    how is Queue intended to be used? I found the following code in python
    manual, but I don't understand how to stop consumers after all items
    have been produced. I tried different approaches but all of them
    seemed incorrect (race, deadlock or duplicating queue functionality)
    >
    >
    def worker():
    while True:
    item = q.get()
    do_work(item)
    q.task_done()
    >
    q = Queue()
    for i in range(num_worke r_threads):
    t = Thread(target=w orker)
    t.setDaemon(Tru e)
    t.start()
    >
    for item in source():
    q.put(item)
    >
    q.join() # block until all tasks are done
    Put a sentinel into the queue that gets interpreted as "terminate" for the
    workers. You need of course to put it in there once for each worker.

    Diez

    Comment

    • =?ISO-8859-1?Q?Gerhard_H=E4ring?=

      #3
      Re: use of Queue

      Alexandru Mosoi wrote:
      how is Queue intended to be used? I found the following code in python
      manual, but I don't understand how to stop consumers after all items
      have been produced. I tried different approaches but all of them
      seemed incorrect (race, deadlock or duplicating queue functionality)
      >
      >
      def worker():
      while True:
      item = q.get()
      if item is None:
      break
      do_work(item)
      q.task_done()
      >
      q = Queue()
      for i in range(num_worke r_threads):
      t = Thread(target=w orker)
      t.setDaemon(Tru e)
      t.start()
      >
      for item in source():
      q.put(item)
      # stop all consumers
      for i in range(num_worke r_threads):
      q.put(None)
      >
      q.join() # block until all tasks are done
      This is how I do it.

      -- Gerhard

      Comment

      • skip@pobox.com

        #4
        Re: use of Queue


        DiezPut a sentinel into the queue that gets interpreted as "terminate"
        Diezfor the workers. You need of course to put it in there once for
        Diezeach worker.

        Or make the consumers daemon threads so that when the producers are finished
        an all non-daemon threads exit, the consumers do as well.

        Skip

        Comment

        • skip@pobox.com

          #5
          Re: use of Queue


          skipOr make the consumers daemon threads so that when the producers
          skipare finished an all non-daemon threads exit, the consumers do as
          skipwell.

          Forget that I wrote this. If they happen to be working on the token they've
          consumed at the time the other threads exit, they will as well. Use the
          sentinel token idea instead.

          Skip

          Comment

          • Alexandru  Mosoi

            #6
            Re: use of Queue

            On Aug 27, 1:06 pm, Gerhard Häring <g...@ghaering. dewrote:
            Alexandru Mosoi wrote:
            how is Queue intended to be used? I found the following code in python
            manual, but I don't understand how to stop consumers after all items
            have been produced. I tried different approaches but all of them
            seemed incorrect (race, deadlock or duplicating queue functionality)
            >
                def worker():
                    while True:
                        item = q.get()
            >
                           if item is None:
                               break
            >
                        do_work(item)
                        q.task_done()
            >
                q = Queue()
                for i in range(num_worke r_threads):
                     t = Thread(target=w orker)
                     t.setDaemon(Tru e)
                     t.start()
            >
                for item in source():
                    q.put(item)
            >
            # stop all consumers
            for i in range(num_worke r_threads):
                 q.put(None)
            >
            >
            >
                q.join()       # block until all tasks are done
            >
            This is how I do it.
            >
            -- Gerhard

            Your solution works assuming that you know how many consumer threads
            you have :). I don't :). More than that, it's not correct if you have
            more than one producer :). Having a sentinel was my very first idea,
            but as you see... it's a race condition (there are cases in which not
            all items are processed).

            Comment

            • Jeff

              #7
              Re: use of Queue

              Your solution works assuming that you know how many consumer threads
              you have :). I don't :). More than that, it's not correct if you have
              more than one producer :). Having a sentinel was my very first idea,
              but as you see... it's a race condition (there are cases in which not
              all items are processed).
              Queue raises an Empty exception when there are no items left in the
              queue. Put the q.get() call in a try block and exit in the except
              block.

              You can also use a condition variable to signal threads to terminate.

              Comment

              • Alexandru  Mosoi

                #8
                Re: use of Queue

                On Aug 27, 2:54 pm, Jeff <jeffo...@gmail .comwrote:
                Queue raises an Empty exception when there are no items left in the
                queue.  Put the q.get() call in a try block and exit in the except
                block.
                Wrong. What if producer takes a long time to produce an item?
                Consumers
                will find the queue empty and exit instead of waiting.
                You can also use a condition variable to signal threads to terminate.
                This is the solution I want to avoid because it duplicates Queue's
                functionality.
                I prefer having a clean solution with nice design to hacking Queue
                class.

                Comment

                • Alexandru  Mosoi

                  #9
                  Re: use of Queue

                  On Aug 27, 12:45 pm, Alexandru Mosoi <brtz...@gmail. comwrote:
                  how is Queue intended to be used? I found the following code in python
                  manual, but I don't understand how to stop consumers after all items
                  have been produced. I tried different approaches but all of them
                  seemed incorrect (race, deadlock or duplicating queue functionality)
                  >
                      def worker():
                          while True:
                              item = q.get()
                              do_work(item)
                              q.task_done()
                  >
                      q = Queue()
                      for i in range(num_worke r_threads):
                           t = Thread(target=w orker)
                           t.setDaemon(Tru e)
                           t.start()
                  >
                      for item in source():
                          q.put(item)
                  >
                      q.join()       # block until all tasks are done

                  ok. I think I figured it out :). let me know what you think

                  global num_tasks, num_done, queue
                  num_tasks = 0
                  num_done = 0
                  queue = Queue()

                  # producer
                  num_tasks += 1
                  for i in items:
                  num_tasks += 1
                  queue.put(i)

                  num_tasks -= 1
                  if num_tasks == num_done:
                  queue.put(None)

                  # consumer
                  while True:
                  i = queue.get()
                  if i is None:
                  queue.put(None)
                  break

                  # do stuff

                  num_done += 1
                  if num_done == num_tasks:
                  queue.put(None)
                  break





                  Comment

                  • Diez B. Roggisch

                    #10
                    Re: use of Queue

                    >
                    Your solution works assuming that you know how many consumer threads
                    you have :). I don't :). More than that, it's not correct if you have
                    more than one producer :). Having a sentinel was my very first idea,
                    but as you see... it's a race condition (there are cases in which not
                    all items are processed).
                    If you have several producers, how do you coordinate when to shut down?

                    Apart from that, you can easily solve the problem of not knowing how many
                    consumers you have by making a consumer stuff back the sentinel into the
                    queue. Then it will ripple down until no consumer is left.

                    Diez

                    Comment

                    • Iain King

                      #11
                      Re: use of Queue

                      On Aug 27, 1:17 pm, Alexandru Mosoi <brtz...@gmail. comwrote:
                      On Aug 27, 12:45 pm, Alexandru Mosoi <brtz...@gmail. comwrote:
                      >
                      >
                      >
                      how is Queue intended to be used? I found the following code in python
                      manual, but I don't understand how to stop consumers after all items
                      have been produced. I tried different approaches but all of them
                      seemed incorrect (race, deadlock or duplicating queue functionality)
                      >
                      def worker():
                      while True:
                      item = q.get()
                      do_work(item)
                      q.task_done()
                      >
                      q = Queue()
                      for i in range(num_worke r_threads):
                      t = Thread(target=w orker)
                      t.setDaemon(Tru e)
                      t.start()
                      >
                      for item in source():
                      q.put(item)
                      >
                      q.join() # block until all tasks are done
                      >
                      ok. I think I figured it out :). let me know what you think
                      >
                      global num_tasks, num_done, queue
                      num_tasks = 0
                      num_done = 0
                      queue = Queue()
                      >
                      # producer
                      num_tasks += 1
                      for i in items:
                      num_tasks += 1
                      queue.put(i)
                      >
                      num_tasks -= 1
                      if num_tasks == num_done:
                      queue.put(None)
                      >
                      # consumer
                      while True:
                      i = queue.get()
                      if i is None:
                      queue.put(None)
                      break
                      >
                      # do stuff
                      >
                      num_done += 1
                      if num_done == num_tasks:
                      queue.put(None)
                      break
                      Are you sure you want to put the final exit code in the consumer?
                      Shouldn't the producer place a None on the queue when it knows it's
                      finished? The way you have it, the producer could make 1 item, it
                      could get consumed, and the consumer exit before the producer makes
                      item 2.

                      Iain

                      Comment

                      • Fredrik Lundh

                        #12
                        Re: use of Queue

                        Alexandru Mosoi wrote:
                        >how is Queue intended to be used? I found the following code in python
                        >manual, but I don't understand how to stop consumers after all items
                        >have been produced. I tried different approaches but all of them
                        >seemed incorrect (race, deadlock or duplicating queue functionality)
                        >>
                        > def worker():
                        > while True:
                        > item = q.get()
                        > do_work(item)
                        > q.task_done()
                        >>
                        > q = Queue()
                        > for i in range(num_worke r_threads):
                        > t = Thread(target=w orker)
                        > t.setDaemon(Tru e)
                        > t.start()
                        >>
                        > for item in source():
                        > q.put(item)
                        >>
                        > q.join() # block until all tasks are done
                        >
                        >
                        ok. I think I figured it out :). let me know what you think
                        >
                        global num_tasks, num_done, queue
                        num_tasks = 0
                        num_done = 0
                        queue = Queue()
                        >
                        # producer
                        num_tasks += 1
                        for i in items:
                        num_tasks += 1
                        queue.put(i)
                        what's the point of using a thread-safe queue if you're going to use a
                        non-thread-safe counter? if you want to write broken code, you can do
                        that in a lot fewer lines ;-)

                        as others have mentioned, you can use sentinels:



                        or, in Python 2.5 and later, the task_done/join pattern shown here:



                        </F>

                        Comment

                        • Paul Rubin

                          #13
                          Re: use of Queue

                          skip@pobox.com writes:
                          Or make the consumers daemon threads so that when the producers are finished
                          an all non-daemon threads exit, the consumers do as well.
                          How are the consumers supposed to know when the producers are
                          finished? Yes, there are different approaches like sentinels, but the
                          absence of a unified approach built into the library really does seem
                          like a deficiency in the library.

                          Comment

                          • Raymond Hettinger

                            #14
                            Re: use of Queue

                            On Aug 27, 4:55 pm, Paul Rubin <http://phr...@NOSPAM.i nvalidwrote:
                            s...@pobox.com writes:
                            Or make the consumers daemon threads so that when the producers are finished
                            an all non-daemon threads exit, the consumers do as well.
                            >
                            How are the consumers supposed to know when the producers are
                            finished?  Yes, there are different approaches like sentinels, but the
                            absence of a unified approach built into the library really does seem
                            like a deficiency in the library.
                            See effbot's reply. The task_done and join methods were put there for
                            exactly this use case.


                            Raymond

                            Comment

                            Working...