Suggested generator to add to threading module.

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • Andrae Muys

    Suggested generator to add to threading module.

    Found myself needing serialised access to a shared generator from
    multiple threads. Came up with the following

    def serialise(gen):
    lock = threading.Lock( )
    while 1:
    lock.acquire()
    try:
    next = gen.next()
    finally:
    lock.release()
    yield next

    I considered suggesting it for itertools, but really it's thread
    specific so I am suggesting it for the threading module.

    Andrae Muys
  • Martin v. Löwis

    #2
    Re: Suggested generator to add to threading module.

    Andrae Muys wrote:[color=blue]
    > I considered suggesting it for itertools, but really it's thread
    > specific so I am suggesting it for the threading module.[/color]

    If you are willing to contribute this function, please submit
    a patch to sf.net/projects/python, including documentation changes,
    and test cases.

    Regards,
    Martin

    Comment

    • Aahz

      #3
      Re: Suggested generator to add to threading module.

      In article <7934d084.04011 52058.164a240c@ posting.google. com>,
      Andrae Muys <amuys@shortech .com.au> wrote:[color=blue]
      >
      >Found myself needing serialised access to a shared generator from
      >multiple threads. Came up with the following
      >
      >def serialise(gen):
      > lock = threading.Lock( )
      > while 1:
      > lock.acquire()
      > try:
      > next = gen.next()
      > finally:
      > lock.release()
      > yield next[/color]

      I'm not sure this is generic enough to go in the standard library.
      Usually, I'd recommend that someone wanting this functionality consider
      other options in addition to this (such as using Queue.Queue()).
      --
      Aahz (aahz@pythoncra ft.com) <*> http://www.pythoncraft.com/

      A: No.
      Q: Is top-posting okay?

      Comment

      • Ype Kingma

        #4
        Re: Suggested generator to add to threading module.

        Andrae Muys wrote:
        [color=blue]
        > Found myself needing serialised access to a shared generator from
        > multiple threads. Came up with the following
        >
        > def serialise(gen):
        > lock = threading.Lock( )
        > while 1:
        > lock.acquire()
        > try:
        > next = gen.next()
        > finally:
        > lock.release()
        > yield next[/color]

        Is there any reason why the lock is not shared among threads?
        From the looks of this, it doesn't synchronize anything
        between different threads. Am I missing something?

        Kind regards,
        Ype

        email at xs4all.nl

        Comment

        • Jeff Epler

          #5
          Re: Suggested generator to add to threading module.

          On Fri, Jan 16, 2004 at 08:42:36PM +0100, Ype Kingma wrote:[color=blue][color=green]
          > > Found myself needing serialised access to a shared generator from
          > > multiple threads. Came up with the following
          > >
          > > def serialise(gen):
          > > lock = threading.Lock( )
          > > while 1:
          > > lock.acquire()
          > > try:
          > > next = gen.next()
          > > finally:
          > > lock.release()
          > > yield next[/color]
          >
          > Is there any reason why the lock is not shared among threads?[color=green]
          > >From the looks of this, it doesn't synchronize anything[/color]
          > between different threads. Am I missing something?[/color]

          Yes, I think so. You'd use the same "serialise" generator object in
          multiple threads, like this:

          p = seralise(produc er_generator())
          threads = [thread.start_ne w(worker_thread , (p,))
          for t in range(num_worke rs)]

          Jeff

          Comment

          • Alan Kennedy

            #6
            Re: Suggested generator to add to threading module.

            [Andrae Muys][color=blue][color=green][color=darkred]
            >>> Found myself needing serialised access to a shared generator from
            >>> multiple threads. Came up with the following
            >>>
            >>> def serialise(gen):
            >>> lock = threading.Lock( )
            >>> while 1:
            >>> lock.acquire()
            >>> try:
            >>> next = gen.next()
            >>> finally:
            >>> lock.release()
            >>> yield next[/color][/color][/color]

            [Ype Kingma][color=blue][color=green]
            >> Is there any reason why the lock is not shared among threads?
            >> From the looks of this, it doesn't synchronize anything
            >> between different threads. Am I missing something?[/color][/color]

            [Jeff Epler][color=blue]
            > Yes, I think so. You'd use the same "serialise" generator object in
            > multiple threads, like this:
            >
            > p = seralise(produc er_generator())
            > threads = [thread.start_ne w(worker_thread , (p,))
            > for t in range(num_worke rs)][/color]

            Hmm. I think Ype is right: the above code does not correctly serialise
            access to a generator.

            The above serialise function is a generator which wraps a generator.
            This presumably is in order to prevent the wrapped generators .next()
            method being called simultaneously from multiple threads (which is
            barred: PEP 255: "Restrictio n: A generator cannot be resumed while it
            is actively running")

            This PEP introduces the concept of generators to Python, as well as a new statement used in conjunction with them, the yield statement.


            However, the above implementation re-creates the problem by using an
            outer generator to wrap the inner one. The outer's .next() method will
            then potentially be called simultaneously by multiple threads. The
            following code illustrates the problem

            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
            import time
            import thread
            import threading

            def serialise(gen):
            lock = threading.Lock( )
            while 1:
            lock.acquire()
            try:
            next = gen.next()
            finally:
            lock.release()
            yield next

            def squares(n):
            i = 1
            while i < n:
            yield i*i
            i = i+1

            def worker_thread(i ter, markers):
            markers[thread.get_iden t()] = 1
            results = [] ; clashes = 0
            while 1:
            try:
            results.append( iter.next())
            except StopIteration:
            break
            except ValueError, ve:
            if str(ve) == "generator already executing":
            clashes = clashes + 1
            del markers[thread.get_iden t()]
            print "Thread %5s: %d results: %d clashes." % (thread.get_ide nt(),\
            len(results), clashes)

            numthreads = 10 ; threadmarkers = {}
            serp = serialise(squar es(100))
            threads = [thread.start_ne w_thread(worker _thread,\
            (serp, threadmarkers)) for t in xrange(numthrea ds)]
            while len(threadmarke rs.keys()) > 0:
            time.sleep(0.1)
            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

            I believe that the following definition of serialise will correct the
            problem (IFF I've understood the problem correctly :-)

            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
            import time
            import thread
            import threading

            class serialise:
            "Wrap a generator in an iterator for thread-safe access"

            def __init__(self, gen):
            self.lock = threading.Lock( )
            self.gen = gen

            def __iter__(self):
            return self

            def next(self):
            self.lock.acqui re()
            try:
            return self.gen.next()
            finally:
            self.lock.relea se()

            def squares(n):
            i = 1
            while i < n:
            yield i*i
            i = i+1

            def worker_thread(i ter, markers):
            markers[thread.get_iden t()] = 1
            results = [] ; clashes = 0
            while 1:
            try:
            results.append( iter.next())
            except StopIteration:
            break
            except ValueError, ve:
            if str(ve) == "generator already executing":
            clashes = clashes + 1
            del markers[thread.get_iden t()]
            print "Thread %5s: %d results: %d clashes." % (thread.get_ide nt(),\
            len(results), clashes)

            numthreads = 10 ; threadmarkers = {}
            serp = serialise(squar es(100))
            threads = [thread.start_ne w_thread(worker _thread,\
            (serp, threadmarkers)) for t in xrange(numthrea ds)]
            while len(threadmarke rs.keys()) > 0:
            time.sleep(0.1)
            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

            Also, I don't know if I'm happy with relying on the fact that the
            generator raises StopIteration for *every* .next() call after the
            actual generated sequence has ended. The above code depends on the
            exhausted generator raising StopIteration in every thread. This seems
            to me the kind of thing that might be python-implementation specific.
            For example, the original "Simple Generators" specification, PEP 255,
            makes no mention of expected behaviour of generators when multiple
            calls are made to the its .next() method after the iteration is
            exhausted. That I can see anyway? Am I wrong?

            This PEP introduces the concept of generators to Python, as well as a new statement used in conjunction with them, the yield statement.


            regards,

            --
            alan kennedy
            ------------------------------------------------------
            check http headers here: http://xhaus.com/headers
            email alan: http://xhaus.com/contact/alan

            Comment

            • Andrae Muys

              #7
              Re: Suggested generator to add to threading module.

              Aahz wrote:[color=blue]
              > In article <7934d084.04011 52058.164a240c@ posting.google. com>,
              > Andrae Muys <amuys@shortech .com.au> wrote:
              >[color=green]
              >>Found myself needing serialised access to a shared generator from
              >>multiple threads. Came up with the following
              >>
              >>def serialise(gen):
              >> lock = threading.Lock( )
              >> while 1:
              >> lock.acquire()
              >> try:
              >> next = gen.next()
              >> finally:
              >> lock.release()
              >> yield next[/color]
              >
              >
              > I'm not sure this is generic enough to go in the standard library.
              > Usually, I'd recommend that someone wanting this functionality consider
              > other options in addition to this (such as using Queue.Queue()).[/color]

              While I fully appreciate the importance of a Queue.Queue in implementing
              a producer/consumer task relationship, this particular function provides
              serialised access to a *passive* data-stream. With the increasing
              sophistication of itertools and I feel there maybe an argument for
              supporting shared access to a generator.

              Anyway I thought it was worth offering as a possible bridge between the
              itertools and threading modules. If I'm mistaken, then it's no major loss.

              Andrae

              Comment

              • Ype Kingma

                #8
                Re: Suggested generator to add to threading module.

                Alan,

                you wrote:[color=blue]
                > [Andrae Muys][color=green][color=darkred]
                >>>> Found myself needing serialised access to a shared generator from
                >>>> multiple threads. Came up with the following
                >>>>
                >>>> def serialise(gen):
                >>>> lock = threading.Lock( )
                >>>> while 1:
                >>>> lock.acquire()
                >>>> try:
                >>>> next = gen.next()
                >>>> finally:
                >>>> lock.release()
                >>>> yield next[/color][/color]
                >
                > [Ype Kingma][color=green][color=darkred]
                >>> Is there any reason why the lock is not shared among threads?
                >>> From the looks of this, it doesn't synchronize anything
                >>> between different threads. Am I missing something?[/color][/color]
                >
                > [Jeff Epler][color=green]
                >> Yes, I think so. You'd use the same "serialise" generator object in
                >> multiple threads, like this:
                >>
                >> p = seralise(produc er_generator())
                >> threads = [thread.start_ne w(worker_thread , (p,))
                >> for t in range(num_worke rs)][/color]
                >
                > Hmm. I think Ype is right: the above code does not correctly serialise
                > access to a generator.[/color]

                Well, I just reread PEP 255, and I can assure you a was missing something...
                [color=blue]
                > The above serialise function is a generator which wraps a generator.
                > This presumably is in order to prevent the wrapped generators .next()
                > method being called simultaneously from multiple threads (which is
                > barred: PEP 255: "Restrictio n: A generator cannot be resumed while it
                > is actively running")
                >
                > http://www.python.org/peps/pep-0255.html
                >
                > However, the above implementation re-creates the problem by using an
                > outer generator to wrap the inner one. The outer's .next() method will
                > then potentially be called simultaneously by multiple threads. The[/color]

                I agree (after rereading the PEP.)
                [color=blue]
                > following code illustrates the problem
                >
                > #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                > import time
                > import thread
                > import threading
                >
                > def serialise(gen):
                > lock = threading.Lock( )
                > while 1:
                > lock.acquire()
                > try:
                > next = gen.next()
                > finally:
                > lock.release()
                > yield next
                >
                > def squares(n):
                > i = 1
                > while i < n:
                > yield i*i
                > i = i+1
                >
                > def worker_thread(i ter, markers):
                > markers[thread.get_iden t()] = 1
                > results = [] ; clashes = 0
                > while 1:
                > try:
                > results.append( iter.next())
                > except StopIteration:
                > break
                > except ValueError, ve:
                > if str(ve) == "generator already executing":
                > clashes = clashes + 1
                > del markers[thread.get_iden t()]
                > print "Thread %5s: %d results: %d clashes." % (thread.get_ide nt(),\
                > len(results), clashes)
                >
                > numthreads = 10 ; threadmarkers = {}
                > serp = serialise(squar es(100))
                > threads = [thread.start_ne w_thread(worker _thread,\
                > (serp, threadmarkers)) for t in xrange(numthrea ds)]
                > while len(threadmarke rs.keys()) > 0:
                > time.sleep(0.1)
                > #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                >
                > I believe that the following definition of serialise will correct the
                > problem (IFF I've understood the problem correctly :-)
                >
                > #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                > import time
                > import thread
                > import threading
                >
                > class serialise:
                > "Wrap a generator in an iterator for thread-safe access"
                >
                > def __init__(self, gen):
                > self.lock = threading.Lock( )
                > self.gen = gen
                >
                > def __iter__(self):
                > return self
                >
                > def next(self):
                > self.lock.acqui re()
                > try:
                > return self.gen.next()
                > finally:
                > self.lock.relea se()[/color]

                Looks like a candidate for inclusion in a standard library to me.
                [color=blue]
                > def squares(n):
                > i = 1
                > while i < n:
                > yield i*i
                > i = i+1
                >
                > def worker_thread(i ter, markers):
                > markers[thread.get_iden t()] = 1
                > results = [] ; clashes = 0
                > while 1:
                > try:
                > results.append( iter.next())
                > except StopIteration:
                > break
                > except ValueError, ve:
                > if str(ve) == "generator already executing":
                > clashes = clashes + 1
                > del markers[thread.get_iden t()]
                > print "Thread %5s: %d results: %d clashes." % (thread.get_ide nt(),\
                > len(results), clashes)
                >
                > numthreads = 10 ; threadmarkers = {}
                > serp = serialise(squar es(100))
                > threads = [thread.start_ne w_thread(worker _thread,\
                > (serp, threadmarkers)) for t in xrange(numthrea ds)]
                > while len(threadmarke rs.keys()) > 0:
                > time.sleep(0.1)
                > #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                >
                > Also, I don't know if I'm happy with relying on the fact that the
                > generator raises StopIteration for *every* .next() call after the
                > actual generated sequence has ended. The above code depends on the
                > exhausted generator raising StopIteration in every thread. This seems
                > to me the kind of thing that might be python-implementation specific.
                > For example, the original "Simple Generators" specification, PEP 255,
                > makes no mention of expected behaviour of generators when multiple
                > calls are made to the its .next() method after the iteration is
                > exhausted. That I can see anyway? Am I wrong?[/color]

                Quoting from PEP 234:
                This document proposes an iteration interface that objects can provide to control the behaviour of for loops. Looping is customized by providing a method that produces an iterator object. The iterator provides a get next value operation that produces ...


                "Once a particular iterator object has raised StopIteration, will
                it also raise StopIteration on all subsequent next() calls?
                ....
                Resolution: once StopIteration is raised, calling it.next()
                continues to raise StopIteration."

                Thanks to all for the help,

                Ype

                Comment

                • Alan Kennedy

                  #9
                  Re: Suggested generator to add to threading module.

                  [Andrae Muys][color=blue][color=green][color=darkred]
                  >>>>> Found myself needing serialised access to a shared generator from
                  >>>>> multiple threads. Came up with the following
                  >>>>>
                  >>>>> def serialise(gen):
                  >>>>> lock = threading.Lock( )
                  >>>>> while 1:
                  >>>>> lock.acquire()
                  >>>>> try:
                  >>>>> next = gen.next()
                  >>>>> finally:
                  >>>>> lock.release()
                  >>>>> yield next[/color][/color][/color]

                  [Ype Kingma][color=blue][color=green][color=darkred]
                  >>>> Is there any reason why the lock is not shared among threads?
                  >>>> From the looks of this, it doesn't synchronize anything
                  >>>> between different threads. Am I missing something?[/color][/color][/color]

                  [Jeff Epler][color=blue][color=green][color=darkred]
                  >>> Yes, I think so. You'd use the same "serialise" generator object in
                  >>> multiple threads, like this:
                  >>>
                  >>> p = seralise(produc er_generator())
                  >>> threads = [thread.start_ne w(worker_thread , (p,))
                  >>> for t in range(num_worke rs)][/color][/color][/color]

                  [Alan Kennedy][color=blue][color=green]
                  >> Hmm. I think Ype is right: the above code does not correctly serialise
                  >> access to a generator.[/color][/color]

                  [Ype Kingma][color=blue]
                  > Well, I just reread PEP 255, and I can assure you a was missing
                  > something...[/color]

                  Ype,

                  Ah: I see now. You thought it didn't work, but for a different reason
                  than the one I pointed out. You thought that the lock was not shared
                  between threads, though as Jeff pointed out, it is if you use it the
                  right way.

                  But it still doesn't work.

                  [Alan Kennedy][color=blue][color=green]
                  >> I believe that the following definition of serialise will correct the
                  >> problem (IFF I've understood the problem correctly :-)
                  >>
                  >> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                  >> import time
                  >> import thread
                  >> import threading
                  >>
                  >> class serialise:
                  >> "Wrap a generator in an iterator for thread-safe access"
                  >>
                  >> def __init__(self, gen):
                  >> self.lock = threading.Lock( )
                  >> self.gen = gen
                  >>
                  >> def __iter__(self):
                  >> return self
                  >>
                  >> def next(self):
                  >> self.lock.acqui re()
                  >> try:
                  >> return self.gen.next()
                  >> finally:
                  >> self.lock.relea se()[/color][/color]

                  [Ype Kingma][color=blue]
                  > Looks like a candidate for inclusion in a standard library to me.[/color]

                  Well, maybe :-)

                  To be honest, I don't have the time to write test cases, docs and
                  patches. So I think I'll just leave it for people to find in the
                  Google Groups archives ...

                  [Alan Kennedy][color=blue][color=green]
                  >> Also, I don't know if I'm happy with relying on the fact that the
                  >> generator raises StopIteration for *every* .next() call after the
                  >> actual generated sequence has ended. The above code depends on the
                  >> exhausted generator raising StopIteration in every thread. This seems
                  >> to me the kind of thing that might be python-implementation specific.
                  >> For example, the original "Simple Generators" specification, PEP 255,
                  >> makes no mention of expected behaviour of generators when multiple
                  >> calls are made to the its .next() method after the iteration is
                  >> exhausted. That I can see anyway? Am I wrong?[/color][/color]

                  [Ype Kingma][color=blue]
                  > Quoting from PEP 234:
                  > http://www.python.org/peps/pep-0234.html
                  >
                  > "Once a particular iterator object has raised StopIteration, will
                  > it also raise StopIteration on all subsequent next() calls?
                  > ...
                  > Resolution: once StopIteration is raised, calling it.next()
                  > continues to raise StopIteration."[/color]

                  Yes, that clears the issue up nicely. Thanks for pointing that out.

                  So the same code will run correctly in Jython 2.3 and IronPython
                  (awaited with anticipation).

                  regards,

                  --
                  alan kennedy
                  ------------------------------------------------------
                  check http headers here: http://xhaus.com/headers
                  email alan: http://xhaus.com/contact/alan

                  Comment

                  • Andrae Muys

                    #10
                    Accessing a shared generator from multiple threads.

                    [Subject line changed to allow thread to be found more easily in
                    google-groups]

                    Alan Kennedy <alanmk@hotmail .com> wrote in message news:<400AB936. BA1D9D73@hotmai l.com>...[color=blue]
                    > [Alan Kennedy][color=green][color=darkred]
                    > >> I believe that the following definition of serialise will correct the
                    > >> problem (IFF I've understood the problem correctly :-)
                    > >>[/color][/color][/color]

                    It does look like the following version will work, I was too focused
                    on synchronising the underlying generator, and forgot that my code
                    also needed to be re-entrant. Thanks for catching my mistake.
                    [color=blue][color=green][color=darkred]
                    > >> #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                    > >> import time
                    > >> import thread
                    > >> import threading
                    > >>
                    > >> class serialise:
                    > >> "Wrap a generator in an iterator for thread-safe access"
                    > >>
                    > >> def __init__(self, gen):
                    > >> self.lock = threading.Lock( )
                    > >> self.gen = gen
                    > >>
                    > >> def __iter__(self):
                    > >> return self
                    > >>
                    > >> def next(self):
                    > >> self.lock.acqui re()
                    > >> try:
                    > >> return self.gen.next()
                    > >> finally:
                    > >> self.lock.relea se()[/color][/color]
                    >
                    > [Ype Kingma][color=green]
                    > > Looks like a candidate for inclusion in a standard library to me.[/color]
                    >
                    > Well, maybe :-)
                    >
                    > To be honest, I don't have the time to write test cases, docs and
                    > patches. So I think I'll just leave it for people to find in the
                    > Google Groups archives ...
                    >[/color]

                    Andrae Muys

                    Comment

                    • Andrae Muys

                      #11
                      Re: Suggested generator to add to threading module.

                      aahz@pythoncraf t.com (Aahz) wrote in message news:<bu998t$o6 2$1@panix1.pani x.com>...[color=blue]
                      > In article <7934d084.04011 52058.164a240c@ posting.google. com>,
                      > Andrae Muys <amuys@shortech .com.au> wrote:[color=green]
                      > >
                      > >Found myself needing serialised access to a shared generator from
                      > >multiple threads. Came up with the following
                      > >
                      > >def serialise(gen):
                      > > lock = threading.Lock( )
                      > > while 1:
                      > > lock.acquire()
                      > > try:
                      > > next = gen.next()
                      > > finally:
                      > > lock.release()
                      > > yield next[/color]
                      >
                      > I'm not sure this is generic enough to go in the standard library.
                      > Usually, I'd recommend that someone wanting this functionality consider
                      > other options in addition to this (such as using Queue.Queue()).[/color]

                      I'm curious to know how a Queue.Queue() provides the same
                      functionality? I have always considered a Queue.Queue() to be an
                      inter-thread communcation primitive. serialise() (at least the
                      corrected version discussed later in this thread) is strictly a
                      synchronisation primitive.

                      Andrae

                      Comment

                      • Aahz

                        #12
                        Re: Suggested generator to add to threading module.

                        In article <7934d084.04011 81808.6e698042@ posting.google. com>,
                        Andrae Muys <amuys@shortech .com.au> wrote:[color=blue]
                        >aahz@pythoncra ft.com (Aahz) wrote in message news:<bu998t$o6 2$1@panix1.pani x.com>...[color=green]
                        >> In article <7934d084.04011 52058.164a240c@ posting.google. com>,
                        >> Andrae Muys <amuys@shortech .com.au> wrote:[color=darkred]
                        >>>
                        >>>Found myself needing serialised access to a shared generator from
                        >>>multiple threads. Came up with the following
                        >>>
                        >>>def serialise(gen):
                        >>> lock = threading.Lock( )
                        >>> while 1:
                        >>> lock.acquire()
                        >>> try:
                        >>> next = gen.next()
                        >>> finally:
                        >>> lock.release()
                        >>> yield next[/color]
                        >>
                        >> I'm not sure this is generic enough to go in the standard library.
                        >> Usually, I'd recommend that someone wanting this functionality consider
                        >> other options in addition to this (such as using Queue.Queue()).[/color]
                        >
                        >I'm curious to know how a Queue.Queue() provides the same
                        >functionalit y? I have always considered a Queue.Queue() to be an
                        >inter-thread communcation primitive. serialise() (at least the
                        >corrected version discussed later in this thread) is strictly a
                        >synchronisatio n primitive.[/color]

                        Well, yes; Queue.Queue() provides both synchronization *and* data
                        protection. In some ways, it's overkill for this specific problem, but
                        my experience is that there are so many different ways to approach this
                        class of problems and so many ways to screw up threaded applications,
                        it's best to learn one swiss-army knife that can handle almost everything
                        you want to throw at it.
                        --
                        Aahz (aahz@pythoncra ft.com) <*> http://www.pythoncraft.com/

                        A: No.
                        Q: Is top-posting okay?

                        Comment

                        • Alan Kennedy

                          #13
                          Re: Suggested generator to add to threading module.

                          [Andrae Muys][color=blue]
                          > I'm curious to know how a Queue.Queue() provides the same
                          > functionality? I have always considered a Queue.Queue() to be an
                          > inter-thread communcation primitive.[/color]

                          Not exactly.

                          Queue.Queue is a *thread-safe* communication primitive: you're not
                          required to have seperate threads at both ends of a Queue.Queue, but
                          it is guaranteed to work correctly if you do have multiple threads.

                          From the module documentation

                          """
                          The Queue module implements a multi-producer, multi-consumer FIFO
                          queue. It is especially useful in threads programming when information
                          must be exchanged safely between multiple threads. The Queue class in
                          this module implements all the required locking semantics. It depends
                          on the availability of thread support in Python.
                          """


                          [color=blue]
                          > serialise() (at least the
                          > corrected version discussed later in this thread) is strictly a
                          > synchronisation primitive.[/color]

                          Just as Queue.Queue is a synchronisation primitive: a very flexible
                          and useful primitive that happens to be usable in a host of different
                          scenarios.

                          I think I'm with Aahz on this one: when faced with this kind of
                          problem, I think it is best to use a tried and tested inter-thread
                          communication paradigm, such as Queue.Queue. In this case, Queue.Queue
                          fits the problem (which is just a variation of the producer/consumer
                          problem) naturally. Also, I doubt very much if there is much excessive
                          resource overhead when using Queue.Queues.

                          As you've already seen from your first cut of the code, writing
                          thread-safe code is an error-prone process, and it's sometimes
                          difficult to figure out all the possibile calling combinations when
                          multiple threads are involved.

                          But if you'd used Queue.Queue, well this whole conversation would
                          never have come up, would it ;-)

                          regards,

                          --
                          alan kennedy
                          ------------------------------------------------------
                          check http headers here: http://xhaus.com/headers
                          email alan: http://xhaus.com/contact/alan

                          Comment

                          • Alan Kennedy

                            #14
                            Re: Accessing a shared generator from multiple threads.

                            [Andrae Muys][color=blue]
                            > Moved to email for higher bandwidth. Feel free to quote to usenet if
                            > you desire.[/color]

                            [Alan Kennedy][color=blue][color=green]
                            >> I think I'm with Aahz on this one: when faced with this kind of
                            >> problem, I think it is best to use a tried and tested inter-thread
                            >> communication paradigm, such as Queue.Queue. In this case, Queue.Queue
                            >> fits the problem (which is just a variation of the producer/consumer
                            >> problem) naturally. Also, I doubt very much if there is much excessive
                            >> resource overhead when using Queue.Queues.[/color][/color]

                            [Andrae Muys][color=blue]
                            >Well I'm puzzled, because I couldn't see an easy way to use Queue.Queue
                            >to achieve this because this isn't a strict producer/consumer problem.
                            >I am trying to synchronise multiple consumers, but I don't have a
                            >producer. So the only way I can see to use Queue.Queue to achieve
                            >this is to spawn a thread specifically to convert the iterator in to
                            >a producer.[/color]

                            Andrae,

                            I thought it best to continue this discussion on UseNet, to perhaps
                            get more opinions.

                            Yes, you're right. Using a Queue in this situation does require the
                            use of a dedicated thread for the producer. There is no way to "pull"
                            values from a generator to multiple consumers through a Queue.Queue.
                            The values have to be "pushed" onto the Queue.Queue by some producing
                            thread of execution.

                            The way I see it, the options are

                            Option 1. Spawn a separate thread to execute the producing generator.
                            However, this has problems:-

                            A: How do the threads recognise the end of the generated sequence?
                            This is not a simple problem: the Queue simply being empty does not
                            necessarily signify the end of the sequence (e.g., the producer thread
                            might not be getting its fair share of CPU time).

                            B: The Queue acts as a (potentially infinite) buffer for the generated
                            values, thus eliminating one of the primary benefits of generators:
                            their efficient "generate when required" nature. This can be helped
                            somewhat by limiting the number of entries in the Queue, but it is
                            still slightly unsatisfactory.

                            C: A thread of execution has to be dedicated to the producer, thus
                            consuming resources.

                            Option 2. Fill the Queue with values from a main thread which executes
                            the generator to exhaustion. The consuming threads simply peel values
                            from the Queue. Although this saves on thread overhead, it is the
                            least desirable in terms of memory overhead: the number of values
                            generated by the generator and buffered in the Queue could be very
                            large.

                            Option 3. Use the same paradigm as your original paradigm, i.e. there
                            is no producer thread and the consuming threads are themselves
                            responsible for calling the generator.next( ) method: access to this
                            method is synchronised on a threading.Lock. I really like this
                            solution, because values are only generated on demand, with no
                            temporary storage of values required.

                            I think that an ideal solution would be to create a dedicated class
                            for synchronising a generator, as my example did, BUT to implement the
                            same interface as Queue.Queue, so that client code would remain
                            ignorant that it was dealing with a generator.

                            Here is my version of such a beast

                            # -=-=-=-=-= file GenQueue.py =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
                            import threading

                            class Empty(Exception ): pass
                            class Exhausted(StopI teration): pass
                            class IllegalOperatio n(Exception): pass

                            class GenQueue:
                            "Simulate a Queue.Queue, with values produced from a generator"

                            def __init__(self, gen):
                            self.lock = threading.Lock( )
                            self.gen = gen

                            def __iter__(self):
                            return self

                            def _get(self, block=1):
                            if self.lock.acqui re(block):
                            try:
                            try:
                            return self.gen.next()
                            except StopIteration:
                            raise Exhausted
                            finally:
                            self.lock.relea se()
                            else:
                            raise Empty

                            def next(self):
                            return self._get(1)

                            def get(self, block=1):
                            return self._get(block )

                            def get_nowait(self ):
                            return self._get(0)

                            def put(self, item, block=1):
                            raise IllegalOperatio n

                            def put_nowait(self , item):
                            self.put(item, 0)

                            def full(self):
                            return False

                            def empty(self):
                            return False

                            def qsize(self):
                            return 1j

                            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

                            And here is some code that tests it

                            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

                            import sys
                            import time
                            import thread
                            import GenQueue

                            def squares(n):
                            i = 1
                            while i <= n:
                            yield i*i
                            i = i+1

                            def test_blockget(r esults, queue):
                            while 1:
                            try:
                            results.append( queue.get())
                            except GenQueue.Empty:
                            raise TestFailure
                            except GenQueue.Exhaus ted:
                            break

                            def test_iter(resul ts, queue):
                            for v in queue:
                            results.append( v)

                            def test_noblockget (results, queue):
                            while 1:
                            try:
                            results.append( queue.get_nowai t())
                            except GenQueue.Empty:
                            pass
                            except GenQueue.Exhaus ted:
                            break

                            def threadwrap(func , queue, markers):
                            markers[thread.get_iden t()] = 1
                            results = []
                            func(results, queue)
                            print "Test %s: Thread %5s: %d results." % (func.__name__, \
                            thread.get_iden t(), len(results))
                            del markers[thread.get_iden t()]

                            def test():
                            numthreads = 10
                            for tfunc in (test_blockget, test_iter, test_noblockget ):
                            print "Test: %s ------------------------------->" % tfunc.__name__
                            threadmarkers = {}
                            q = GenQueue.GenQue ue(squares(100) )
                            threads = [thread.start_ne w_thread(thread wrap,\
                            (tfunc, q, threadmarkers)) for t in
                            xrange(numthrea ds)]
                            while len(threadmarke rs.keys()) > 0:
                            time.sleep(0.1)

                            if __name__ == "__main__":
                            test()

                            #-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

                            I find the combination of the iteration protocol and a Queue
                            intriguing: in this case, Queue.get() and iter.next() mean the same
                            thing. Or maybe I'm just being sucked in by the syntactic niceties of
                            something like

                            def worker(inq, outq):
                            for thing in inq: outq.put(thing. work())

                            I'm interested to hear other opinions about the commonalities and
                            differences between Queues and iterators.

                            One problem that is always in the back of my mind these days is how
                            one could write a dispatch-based coroutine scheduler that would work
                            efficiently when in communication (through Queue.Queues?) with
                            independently executing coroutine schedulers running on other
                            processors in the box. (And before you jump in shouting "Global
                            Interpreter Lock!", remember jython + generators will be able to do
                            this).

                            Not that I need such a thing: it's just a fun thing to think about,
                            like crosswords :-)

                            cheers,

                            --
                            alan kennedy
                            ------------------------------------------------------
                            check http headers here: http://xhaus.com/headers
                            email alan: http://xhaus.com/contact/alan

                            Comment

                            • Josiah Carlson

                              #15
                              Re: Accessing a shared generator from multiple threads.


                              Even easier:

                              Q = Queue.Queue()
                              Q.put(gen)
                              def thread():
                              a = Q.get()
                              use = a.next()
                              Q.put(a)
                              #do whatever you need

                              Of course you could just as easily use a single lock and a class:

                              class lockedgen:
                              def __init__(self, gen):
                              self.g = gen
                              self.l = threading.Lock( )
                              def get(self):
                              self.l.acquire( )
                              a = self.g.next()
                              self.l.release( )
                              return a

                              generator = lockedgen(gen)
                              def thread():
                              use = generator.get()
                              #do what you need


                              - Josiah

                              Comment

                              Working...