Request for comments about synchronized queue using boost

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • =?ISO-8859-1?Q?Nordl=F6w?=

    Request for comments about synchronized queue using boost

    I am currently designing a synchronized queue used to communicate
    between threads. Is the code given below a good solution? Am I
    using mutex lock/unlock more than needed?

    Are there any resources out there on the Internet on how to design
    *thread-safe* *efficient* data-
    structures?

    /Nordlöw

    The file synched_queue.h pp follows:

    #ifndef PNW__SYNCHED_QU EUE_HPP
    #define PNW__SYNCHED_QU EUE_HPP

    /*!
    * @file synched_queue.h pp
    * @brief Synchronized (Thread Safe) Container Wrapper on std:queue
    * using Boost::Thread.
    */

    #include <queue>
    #include <iostream>

    #include <boost/bind.hpp>
    #include <boost/thread/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>

    //
    =============== =============== =============== =============== =============== =

    template <typename T>
    class synched_queue
    {
    std::queue<Tq; ///< Queue.
    boost::mutex m; ///< Mutex.
    public:
    /*!
    * Push @p value.
    */
    void push(const T & value) {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    q.push(value);
    }

    /*!
    * Try and pop into @p value, returning directly in any case.
    * @return true if pop was success, false otherwise.
    */
    bool try_pop(T & value) {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    if (q.size()) {
    value = q.front();
    q.pop();
    return true;
    }
    return false;
    }

    /// Pop and return value, possibly waiting forever.
    T wait_pop() {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    // wait until queue has at least on element()
    c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
    T value = q.front();
    q.pop();
    return value;
    }

    size_type size() const {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    return q.size();
    }

    bool empty() const {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    return q.empty();
    }

    };

    //
    =============== =============== =============== =============== =============== =

    #endif
  • Hendrik Schober

    #2
    Re: Request for comments about synchronized queue using boost

    Nordlöw wrote:
    I am currently designing a synchronized queue used to communicate
    between threads. Is the code given below a good solution? Am I
    using mutex lock/unlock more than needed?
    >
    Are there any resources out there on the Internet on how to design
    *thread-safe* *efficient* data-
    structures?
    comp.programmin g.threads?
    /Nordlöw
    >
    [...]
    >
    /// Pop and return value, possibly waiting forever.
    T wait_pop() {
    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
    // wait until queue has at least on element()
    c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
    T value = q.front();
    q.pop();
    return value;
    }
    I haven't done any threading in a decade or so, but I wonder how
    in the above code anything could be put into the locked queue.
    What am I missing?
    Oh, and I wonder what 'c' is.
    [...]
    Schobi

    Comment

    • Maxim Yegorushkin

      #3
      Re: Request for comments about synchronized queue using boost

      On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
      I am currently designing a synchronized queue used to communicate
      between threads. Is the code given below a good solution? Am I
      using mutex lock/unlock more than needed?
      /Nordlöw
      >
      The file synched_queue.h pp follows:
      >
      #ifndef PNW__SYNCHED_QU EUE_HPP
      #define PNW__SYNCHED_QU EUE_HPP
      >
      /*!
       * @file synched_queue.h pp
       * @brief Synchronized (Thread Safe) Container Wrapper on std:queue
       *        using Boost::Thread.
       */
      >
      #include <queue>
      #include <iostream>
      >
      #include <boost/bind.hpp>
      #include <boost/thread/thread.hpp>
      #include <boost/thread/mutex.hpp>
      #include <boost/thread/condition.hpp>
      >
      //
      =============== =============== =============== =============== =============== =
      >
      template <typename T>
      class synched_queue
      {
          std::queue<Tq;              ///< Queue.
          boost::mutex m;             ///< Mutex.
      A member variable is missing here:

      boost::conditio n c;
      public:
          /*!
           * Push @p value.
           */
          void push(const T & value) {
              boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
              q.push(value);
      You need to notify other threads waiting on the queue:

      c.notify_one();
          }
      >
          /*!
           * Try and pop into @p value, returning directly in any case.
           * @return true if pop was success, false otherwise.
           */
          bool try_pop(T & value) {
              boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
              if (q.size()) {
                  value = q.front();
                  q.pop();
                  return true;
              }
              return false;
          }
      >
          /// Pop and return value, possibly waiting forever.
          T wait_pop() {
              boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
              // wait until queue has at least on element()
      The following line:
              c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
      boost::bind(&st d::queue<T>::si ze, q) stores a copy of the queue in the
      object created by boost::bind, so that the wait never finishes if the
      queue is empty (and if the condition variable is not notified (see
      above)).

      It should be as simple as:

      while(q.empty() )
      c.wait(sl);
              T value = q.front();
              q.pop();
              return value;
          }
      >
          size_type size() const {
              boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
              return q.size();
          }
      >
          bool empty() const {
              boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
              return q.empty();
          }
      >
      };
      >
      //
      =============== =============== =============== =============== =============== =
      >
      #endif

      The other thing is that the queue does not support destruction: the
      destructor does not unblock any threads blocked in wait.

      Apart from that, the mutex is held for too long. You don't really need
      to hold the lock when allocating memory for elements and when invoking
      the copy constructor of the elements.

      Here is an improved version (although a bit simplified):

      #include <boost/thread/mutex.hpp>
      #include <boost/thread/condition.hpp>
      #include <boost/function.hpp>
      #include <list>

      template<class T>
      class atomic_queue : private boost::noncopya ble
      {
      private:
      boost::mutex mtx_;
      boost::conditio n cnd_;
      bool cancel_;
      unsigned waiting_;

      // use list as a queue because it allows for splicing:
      // moving elements between lists without any memory allocation and
      copying
      typedef std::list<Tqueu e_type;
      queue_type q_;

      public:
      struct cancelled : std::logic_erro r
      {
      cancelled() : std::logic_erro r("cancelled" ) {}
      };

      atomic_queue()
      : cancel_()
      , waiting_()
      {}

      ~atomic_queue()
      {
      // cancel all waiting threads
      this->cancel();
      }

      void cancel()
      {
      // cancel all waiting threads
      boost::mutex::s coped_lock l(mtx_);
      cancel_ = true;
      cnd_.notify_all ();
      // and wait till they are done
      while(waiting_)
      cnd_.wait(l);
      }

      void push(T const& t)
      {
      // this avoids an allocation inside the critical section
      bellow
      queue_type tmp(&t, &t + 1);
      {
      boost::mutex::s coped_lock l(mtx_);
      q_.splice(q_.en d(), tmp);
      }
      cnd_.notify_one ();
      }

      // this function provides only basic exception safety if T's copy
      ctor can
      // throw or strong exception safety if T's copy ctor is nothrow
      T pop()
      {
      // this avoids copying T inside the critical section bellow
      queue_type tmp;
      {
      boost::mutex::s coped_lock l(mtx_);
      ++waiting_;
      while(!cancel_ && q_.empty())
      cnd_.wait(l);
      --waiting_;
      if(cancel_)
      {
      cnd_.notify_all ();
      throw cancelled();
      }
      tmp.splice(tmp. end(), q_, q_.begin());
      }
      return tmp.front();
      }
      };

      typedef boost::function <void()unit_of_ work;
      typedef atomic_queue<un it_of_workwork_ queue;

      void typical_thread_ pool_working_th read(work_queue * q)
      try
      {
      for(;;)
      q->pop()();
      }
      catch(work_queu e::cancelled&)
      {
      // time to terminate the thread
      }
      Are there any resources out there on the Internet on how to design
      *thread-safe* *efficient* data-structures?
      I would recommend "Programmin g with POSIX Threads" book by by David R.
      Butenhof.

      --
      Max

      Comment

      • Thomas J. Gritzan

        #4
        Re: Request for comments about synchronized queue using boost

        Hendrik Schober wrote:
        Nordlöw wrote:
        >I am currently designing a synchronized queue used to communicate
        >between threads. Is the code given below a good solution? Am I
        >using mutex lock/unlock more than needed?
        >>
        >Are there any resources out there on the Internet on how to design
        >*thread-safe* *efficient* data-
        >structures?
        >
        comp.programmin g.threads?
        >
        >/Nordlöw
        >>
        >[...]
        >>
        > /// Pop and return value, possibly waiting forever.
        > T wait_pop() {
        > boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
        > // wait until queue has at least on element()
        > c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
        > T value = q.front();
        > q.pop();
        > return value;
        > }
        >
        I haven't done any threading in a decade or so, but I wonder how
        in the above code anything could be put into the locked queue.
        What am I missing?
        Oh, and I wonder what 'c' is.
        c is a condition variable:


        You lock the mutex, then wait for a condition, which (automatically)
        unlocks the mutex, and locks it again if the condition occurs.

        --
        Thomas

        Comment

        • Hendrik Schober

          #5
          Re: Request for comments about synchronized queue using boost

          Thomas J. Gritzan wrote:
          [...]
          > I haven't done any threading in a decade or so, but I wonder how
          > in the above code anything could be put into the locked queue.
          > What am I missing?
          > Oh, and I wonder what 'c' is.
          >
          c is a condition variable:

          >
          You lock the mutex, then wait for a condition, which (automatically)
          unlocks the mutex, and locks it again if the condition occurs.
          Ah, thanks. I haven't looked at boost's threads yet.

          Schobi

          Comment

          • =?ISO-8859-1?Q?Nordl=F6w?=

            #6
            Re: Request for comments about synchronized queue using boost

            On 15 Okt, 18:02, Maxim Yegorushkin <maxim.yegorush ...@gmail.com>
            wrote:
            On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
            >
            >
            >
            I am currently designing a synchronized queue used to communicate
            between threads. Is the code given below a good solution? Am I
            using mutex lock/unlock more than needed?
            /Nordlöw
            >
            The file synched_queue.h pp follows:
            >
            #ifndef PNW__SYNCHED_QU EUE_HPP
            #define PNW__SYNCHED_QU EUE_HPP
            >
            /*!
             * @file synched_queue.h pp
             * @brief Synchronized (Thread Safe) Container Wrapper on std:queue
             *        using Boost::Thread.
             */
            >
            #include <queue>
            #include <iostream>
            >
            #include <boost/bind.hpp>
            #include <boost/thread/thread.hpp>
            #include <boost/thread/mutex.hpp>
            #include <boost/thread/condition.hpp>
            >
            //
            =============== =============== =============== =============== =============== =
            >
            template <typename T>
            class synched_queue
            {
                std::queue<Tq;              ///< Queue.
                boost::mutex m;             ///< Mutex.
            >
            A member variable is missing here:
            >
                boost::conditio n c;
            >
            public:
                /*!
                 * Push @p value.
                 */
                void push(const T & value) {
                    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                    q.push(value);
            >
            You need to notify other threads waiting on the queue:
            >
                c.notify_one();
            >
            >
            >
                }
            >
                /*!
                 * Try and pop into @p value, returning directly in any case.
                 * @return true if pop was success, false otherwise.
                 */
                bool try_pop(T & value) {
                    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                    if (q.size()) {
                        value = q.front();
                        q.pop();
                        return true;
                    }
                    return false;
                }
            >
                /// Pop and return value, possibly waiting forever.
                T wait_pop() {
                    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                    // wait until queue has at least on element()
            >
            The following line:
            >
                    c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
            >
            boost::bind(&st d::queue<T>::si ze, q) stores a copy of the queue in the
            object created by boost::bind, so that the wait never finishes if the
            queue is empty (and if the condition variable is not notified (see
            above)).
            >
            It should be as simple as:
            >
                while(q.empty() )
                    c.wait(sl);
            >
            >
            >
                    T value = q.front();
                    q.pop();
                    return value;
                }
            >
                size_type size() const {
                    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                    return q.size();
                }
            >
                bool empty() const {
                    boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                    return q.empty();
                }
            >
            };
            >
            //
            =============== =============== =============== =============== =============== =
            >
            #endif
            >
            The other thing is that the queue does not support destruction: the
            destructor does not unblock any threads blocked in wait.
            >
            Apart from that, the mutex is held for too long. You don't really need
            to hold the lock when allocating memory for elements and when invoking
            the copy constructor of the elements.
            >
            Here is an improved version (although a bit simplified):
            >
            #include <boost/thread/mutex.hpp>
            #include <boost/thread/condition.hpp>
            #include <boost/function.hpp>
            #include <list>
            >
            template<class T>
            class atomic_queue : private boost::noncopya ble
            {
            private:
                boost::mutex mtx_;
                boost::conditio n cnd_;
                bool cancel_;
                unsigned waiting_;
            >
                // use list as a queue because it allows for splicing:
                // moving elements between lists without any memory allocation and
            copying
                typedef std::list<Tqueu e_type;
                queue_type q_;
            >
            public:
                struct cancelled : std::logic_erro r
                {
                    cancelled() : std::logic_erro r("cancelled" ) {}
                };
            >
                atomic_queue()
                    : cancel_()
                    , waiting_()
                {}
            >
                ~atomic_queue()
                {
                    // cancel all waiting threads
                    this->cancel();
                }
            >
                void cancel()
                {
                    // cancel all waiting threads
                    boost::mutex::s coped_lock l(mtx_);
                    cancel_ = true;
                    cnd_.notify_all ();
                    // and wait till they are done
                    while(waiting_)
                        cnd_.wait(l);
                }
            >
                void push(T const& t)
                {
                    // this avoids an allocation inside the critical section
            bellow
                    queue_type tmp(&t, &t + 1);
                    {
                        boost::mutex::s coped_lock l(mtx_);
                        q_.splice(q_.en d(), tmp);
                    }
                    cnd_.notify_one ();
                }
            >
                // this function provides only basic exception safety if T's copy
            ctor can
                // throw or strong exception safety if T's copy ctor is nothrow
                T pop()
                {
                    // this avoids copying T inside the critical section bellow
                    queue_type tmp;
                    {
                        boost::mutex::s coped_lock l(mtx_);
                        ++waiting_;
                        while(!cancel_ && q_.empty())
                            cnd_.wait(l);
                        --waiting_;
                        if(cancel_)
                        {
                            cnd_.notify_all ();
                            throw cancelled();
                        }
                        tmp.splice(tmp. end(), q_, q_.begin());
                    }
                    return tmp.front();
                }
            >
            };
            >
            typedef boost::function <void()unit_of_ work;
            typedef atomic_queue<un it_of_workwork_ queue;
            >
            void typical_thread_ pool_working_th read(work_queue * q)
            try
            {
                for(;;)
                    q->pop()();}
            >
            catch(work_queu e::cancelled&)
            {
                // time to terminate the thread
            >
            }
            Are there any resources out there on the Internet on how to design
            *thread-safe* *efficient* data-structures?
            >
            I would recommend "Programmin g with POSIX Threads" book by by David R.
            Butenhof.
            >
            --
            Max

            Doesn't the push-argument "T const & t" instead of my version "const T
            & t" mean that we don't copy at all here? I believe &t evaluates to
            the memory pointer of t:

            void push(T const& t)
            {
            // this avoids an allocation inside the critical section
            bellow
            queue_type tmp(&t, &t + 1);
            {
            boost::mutex::s coped_lock l(mtx_);
            q_.splice(q_.en d(), tmp);
            }
            cnd_.notify_one ();
            }

            /Nordlöw

            Comment

            • =?ISO-8859-1?Q?Nordl=F6w?=

              #7
              Re: Request for comments about synchronized queue using boost

              On 15 Okt, 20:16, Hendrik Schober <spamt...@gmx.d ewrote:
              Thomas J. Gritzan wrote:
              [...]
               I haven't done any threading in a decade or so, but I wonder how
               in the above code anything could be put into the locked queue.
               What am I missing?
               Oh, and I wonder what 'c' is.
              >>
              You lock the mutex, then wait for a condition, which (automatically)
              unlocks the mutex, and locks it again if the condition occurs.
              >
                Ah, thanks. I haven't looked at boost's threads yet.
              >
                Schobi
              How can I your queue structure in the following code example:


              #include "../synched_queue.h pp"
              #include "../threadpool/include/threadpool.hpp"
              #include <iostream>

              using namespace boost::threadpo ol;

              template <typename T>
              void produce(synched _queue<T& q, size_t n)
              {
              for (size_t i = 0; i < n; i++) {
              T x = i;
              q.push(x);
              std::cout << "i:" << i << " produced: " << x << std::endl;
              }
              }

              template <typename T>
              void consume(synched _queue<T& q, size_t n)
              {
              for (size_t i = 0; i < n; i++) {
              T x = q.wait_pop();
              std::cout << "i:" << i << " consumed: " << x << std::endl;
              }
              }

              int main()
              {
              typedef float Elm;
              synched_queue<f loatq;
              // boost::thread pt(boost::bind( produce<Elm>, q, 10));
              // boost::thread ct(boost::bind( consume<Elm>, q, 10));
              // pt.join();
              // ct.join();
              return 0;
              }


              Thanks in advance,
              /Nordlöw

              Comment

              • Maxim Yegorushkin

                #8
                Re: Request for comments about synchronized queue using boost

                On Oct 16, 3:44 pm, Nordlöw <per.nord...@gm ail.comwrote:
                On 15 Okt, 18:02, Maxim Yegorushkin <maxim.yegorush ...@gmail.com>
                wrote:
                >
                >
                >
                On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                >
                I am currently designing a synchronized queue used to communicate
                between threads. Is the code given below a good solution? Am I
                using mutex lock/unlock more than needed?
                /Nordlöw
                >
                The file synched_queue.h pp follows:
                >
                #ifndef PNW__SYNCHED_QU EUE_HPP
                #define PNW__SYNCHED_QU EUE_HPP
                >
                /*!
                 * @file synched_queue.h pp
                 * @brief Synchronized (Thread Safe) Container Wrapper on std:queue
                 *        using Boost::Thread.
                 */
                >
                #include <queue>
                #include <iostream>
                >
                #include <boost/bind.hpp>
                #include <boost/thread/thread.hpp>
                #include <boost/thread/mutex.hpp>
                #include <boost/thread/condition.hpp>
                >
                //
                =============== =============== =============== =============== =============== =
                >
                template <typename T>
                class synched_queue
                {
                    std::queue<Tq;              ///< Queue.
                    boost::mutex m;             ///< Mutex.
                >
                A member variable is missing here:
                >
                    boost::conditio n c;
                >
                public:
                    /*!
                     * Push @p value.
                     */
                    void push(const T & value) {
                        boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                        q.push(value);
                >
                You need to notify other threads waiting on the queue:
                >
                    c.notify_one();
                >
                    }
                >
                    /*!
                     * Try and pop into @p value, returning directly in any case.
                     * @return true if pop was success, false otherwise.
                     */
                    bool try_pop(T & value) {
                        boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                        if (q.size()) {
                            value = q.front();
                            q.pop();
                            return true;
                        }
                        return false;
                    }
                >
                    /// Pop and return value, possibly waiting forever.
                    T wait_pop() {
                        boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                        // wait until queue has at least on element()
                >
                The following line:
                >
                        c.wait(sl, boost::bind(&st d::queue<T>::si ze, q));
                >
                boost::bind(&st d::queue<T>::si ze, q) stores a copy of the queue in the
                object created by boost::bind, so that the wait never finishes if the
                queue is empty (and if the condition variable is not notified (see
                above)).
                >
                It should be as simple as:
                >
                    while(q.empty() )
                        c.wait(sl);
                >
                        T value = q.front();
                        q.pop();
                        return value;
                    }
                >
                    size_type size() const {
                        boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                        return q.size();
                    }
                >
                    bool empty() const {
                        boost::mutex::s coped_lock sl(m); // NOTE: lock mutex
                        return q.empty();
                    }
                >
                };
                >
                //
                =============== =============== =============== =============== =============== =
                >
                #endif
                >
                The other thing is that the queue does not support destruction: the
                destructor does not unblock any threads blocked in wait.
                >
                Apart from that, the mutex is held for too long. You don't really need
                to hold the lock when allocating memory for elements and when invoking
                the copy constructor of the elements.
                >
                Here is an improved version (although a bit simplified):
                >
                #include <boost/thread/mutex.hpp>
                #include <boost/thread/condition.hpp>
                #include <boost/function.hpp>
                #include <list>
                >
                template<class T>
                class atomic_queue : private boost::noncopya ble
                {
                private:
                    boost::mutex mtx_;
                    boost::conditio n cnd_;
                    bool cancel_;
                    unsigned waiting_;
                >
                    // use list as a queue because it allows for splicing:
                    // moving elements between lists without any memory allocation and
                copying
                    typedef std::list<Tqueu e_type;
                    queue_type q_;
                >
                public:
                    struct cancelled : std::logic_erro r
                    {
                        cancelled() : std::logic_erro r("cancelled" ) {}
                    };
                >
                    atomic_queue()
                        : cancel_()
                        , waiting_()
                    {}
                >
                    ~atomic_queue()
                    {
                        // cancel all waiting threads
                        this->cancel();
                    }
                >
                    void cancel()
                    {
                        // cancel all waiting threads
                        boost::mutex::s coped_lock l(mtx_);
                        cancel_ = true;
                        cnd_.notify_all ();
                        // and wait till they are done
                        while(waiting_)
                            cnd_.wait(l);
                    }
                >
                    void push(T const& t)
                    {
                        // this avoids an allocation inside the critical section
                bellow
                        queue_type tmp(&t, &t + 1);
                        {
                            boost::mutex::s coped_lock l(mtx_);
                            q_.splice(q_.en d(), tmp);
                        }
                        cnd_.notify_one ();
                    }
                >
                    // this function provides only basic exception safety if T's copy
                ctor can
                    // throw or strong exception safety if T's copy ctor is nothrow
                    T pop()
                    {
                        // this avoids copying T inside the critical section bellow
                        queue_type tmp;
                        {
                            boost::mutex::s coped_lock l(mtx_);
                            ++waiting_;
                            while(!cancel_ && q_.empty())
                                cnd_.wait(l);
                            --waiting_;
                            if(cancel_)
                            {
                                cnd_.notify_all ();
                                throw cancelled();
                            }
                            tmp.splice(tmp. end(), q_, q_.begin());
                        }
                        return tmp.front();
                    }
                >
                };
                >
                typedef boost::function <void()unit_of_ work;
                typedef atomic_queue<un it_of_workwork_ queue;
                >
                void typical_thread_ pool_working_th read(work_queue * q)
                try
                {
                    for(;;)
                        q->pop()();}
                >
                catch(work_queu e::cancelled&)
                {
                    // time to terminate the thread
                >
                }
                Are there any resources out there on the Internet on how to design
                *thread-safe* *efficient* data-structures?
                >
                I would recommend "Programmin g with POSIX Threads" book by by David R.
                Butenhof.
                >
                Doesn't the push-argument "T const & t" instead of my version "const T
                & t" mean that we don't copy at all here?
                No, T const& and const T& is the same thing: a reference to a constant
                T.
                I believe &t evaluates to
                the memory pointer of t:
                >
                    void push(T const& t)
                    {
                        // this avoids an allocation inside the critical section
                bellow
                        queue_type tmp(&t, &t + 1);
                        {
                            boost::mutex::s coped_lock l(mtx_);
                            q_.splice(q_.en d(), tmp);
                        }
                        cnd_.notify_one ();
                    }
                The trick here is that element t is first inserted in a temporary list
                tmp on the stack.

                queue_type tmp(&t, &t + 1); // create a list with a copy of t

                This involves allocating memory and copying t. And here it is done
                without holding the lock because allocating memory may be expensive
                (might cause the system to do swapping) and as you hold the lock all
                the worker threads won't be able to pop elements from the queue during
                such time. Next, the lock is acquired and the element is moved from
                list tmp into q_:

                q_.splice(q_.en d(), tmp);

                This operation does not involve any memory allocation or copying
                elements (because you can do so easily with the nodes of doubly-linked
                lists), which make your critical section of code execute really fast
                without stalling the worked threads for too long.

                --
                Max

                Comment

                • Szabolcs Ferenczi

                  #9
                  Re: Request for comments about synchronized queue using boost

                  On Oct 15, 3:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                  I am currently designing a synchronized queue used to communicate
                  between threads. Is the code given below a good solution?
                  Not really.
                  [...]
                  Are there any resources out there on the Internet on how to design
                  *thread-safe* *efficient* data-
                  structures?
                  Sure.


                  Best Regards,
                  Szabolcs

                  Comment

                  • Maxim Yegorushkin

                    #10
                    Re: Request for comments about synchronized queue using boost

                    On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                    I am currently designing a synchronized queue used to communicate
                    between threads. Is the code given below a good solution? Am I
                    using mutex lock/unlock more than needed?
                    >
                    Are there any resources out there on the Internet on how to design
                    *thread-safe* *efficient* data-
                    structures?
                    You can also try concurrent_queu e from


                    Scout around that link for more documentation.

                    --
                    Max

                    Comment

                    • James Kanze

                      #11
                      Re: Request for comments about synchronized queue using boost

                      On Oct 16, 5:42 pm, Szabolcs Ferenczi <szabolcs.feren ...@gmail.com>
                      wrote:
                      On Oct 15, 3:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                      [...]
                      Are there any resources out there on the Internet on how to
                      design *thread-safe* *efficient* data- structures?
                      You have to be very careful with googling in cases like this.
                      There's an awful lot of junk on the net. Just looking at the
                      first hit, for example, it's quite clear that the author doesn't
                      know what he's talking about, and I suspect that that's true in
                      a large number of cases.

                      --
                      James Kanze (GABI Software) email:james.kan ze@gmail.com
                      Conseils en informatique orientée objet/
                      Beratung in objektorientier ter Datenverarbeitu ng
                      9 place Sémard, 78210 St.-Cyr-l'École, France, +33 (0)1 30 23 00 34

                      Comment

                      • James Kanze

                        #12
                        Re: Request for comments about synchronized queue using boost

                        On Oct 15, 6:02 pm, Maxim Yegorushkin
                        <maxim.yegorush ...@gmail.comwr ote:
                        On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                        [...]
                        Apart from that, the mutex is held for too long. You don't
                        really need to hold the lock when allocating memory for
                        elements and when invoking the copy constructor of the
                        elements.
                        Which sounds a lot like pre-mature optimization to me. First
                        get the queue working, then see if there is a performance
                        problem, and only then, do something about it. (Given that
                        clients will normally only call functions on the queue when they
                        have nothing to do, waiting a couple of microseconds longer on
                        the lock won't impact anything.)
                        Here is an improved version (although a bit simplified):
                        void push(T const& t)
                        Actually, I'd name this "send", and not "push". Just because
                        the standard library uses very poor names doesn't mean we have
                        to.
                        {
                        // this avoids an allocation inside the critical section
                        bellow
                        queue_type tmp(&t, &t + 1);
                        {
                        boost::mutex::s coped_lock l(mtx_);
                        q_.splice(q_.en d(), tmp);
                        }
                        cnd_.notify_one ();
                        }
                        This is unnecessary complexity. And probably looses runtime
                        efficiency (not that it's important): his initial version uses
                        std::deque, which doesn't have to allocate at each
                        insertion---in fact, in all of the uses I've measured, the queue
                        tends to hit its maximum size pretty quickly, and there are no
                        more allocations after that.

                        Yet another case where premature optimization turns out to be
                        pessimization.

                        [...]
                        // this function provides only basic exception safety if T's
                        // copy ctor can throw or strong exception safety if T's copy
                        // ctor is nothrow
                        :-).

                        In practice, I find that almost all of my inter-thread queues
                        need to contain polymorphic objects. Which means that the queue
                        contains pointers, and that all of the objects will in fact be
                        dynamically allocated. The result is that I use std::auto_ptr
                        in the interface (so the producer can't access the object once it
                        has been passed off, and the consumer knows to delete it).

                        Of course, std::auto_ptr has a no throw copy constructor, so the
                        queue itself has a strong exception safe guarantee.
                        Are there any resources out there on the Internet on how to
                        design *thread-safe* *efficient* data-structures?
                        I would recommend "Programmin g with POSIX Threads" book by by
                        David R. Butenhof.
                        Very much so, for the basics. (Formally, it's only Unix, but
                        practically, Boost threads are modeled after pthreads.) For the
                        data structures, it's less obvious, and of course, Butenhof
                        doesn't go into the issues which are particular to C++ (local
                        statics with dynamic initialization, the fact that pointers to
                        functions have to be ``extern "C"'', etc.).

                        --
                        James Kanze (GABI Software) email:james.kan ze@gmail.com
                        Conseils en informatique orientée objet/
                        Beratung in objektorientier ter Datenverarbeitu ng
                        9 place Sémard, 78210 St.-Cyr-l'École, France, +33 (0)1 30 23 00 34

                        Comment

                        • Szabolcs Ferenczi

                          #13
                          Re: Request for comments about synchronized queue using boost

                          On Oct 17, 9:43 am, James Kanze <james.ka...@gm ail.comwrote:
                          On Oct 16, 5:42 pm, Szabolcs Ferenczi <szabolcs.feren ...@gmail.com>
                          wrote:
                          >
                          On Oct 15, 3:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                          [...]
                          Are there any resources out there on the Internet on how to
                          design *thread-safe* *efficient* data- structures?
                          Sure.http://www.google.nl/search?q=boost+thread+safe+queue=
                          >
                          You have to be very careful with googling in cases like this.
                          There's an awful lot of junk on the net.  Just looking at the
                          first hit, for example, it's quite clear that the author doesn't
                          know what he's talking about, and I suspect that that's true in
                          a large number of cases.
                          Hmmmm... For me the first hit is a didactic piece by Anthony Williams:

                          Implementing a Thread-Safe Queue using Condition Variables ...
                          In those cases, it might be worth using something like boost::optional
                          to avoid this requirement ... Tags: threading, thread safe, queue,
                          condition variable ...


                          Saying that "it's quite clear that the author doesn't know what he's
                          talking about" is, hmmm..., at least indicates something about you.

                          I do not want to defend him but if you just read it to the end, you
                          must have learnt something, I guess. You should not stop by the first
                          fragment which is just a starting point illustrating the problem.

                          I agree in that he should not have suggest such a bad habit of
                          handling a shared resource in the front part of his article or, at
                          least, he should have warned the smattering reader that it is not the
                          correct way.

                          Happy reading.

                          Best Regards,
                          Szabolcs

                          Comment

                          • Paavo Helde

                            #14
                            Re: Request for comments about synchronized queue using boost

                            James Kanze <james.kanze@gm ail.comkirjutas :
                            On Oct 15, 6:02 pm, Maxim Yegorushkin
                            <maxim.yegorush ...@gmail.comwr ote:
                            >On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                            >
                            [...]
                            >Apart from that, the mutex is held for too long. You don't
                            >really need to hold the lock when allocating memory for
                            >elements and when invoking the copy constructor of the
                            >elements.
                            >
                            Which sounds a lot like pre-mature optimization to me. First
                            get the queue working, then see if there is a performance
                            problem, and only then, do something about it. (Given that
                            clients will normally only call functions on the queue when they
                            have nothing to do, waiting a couple of microseconds longer on
                            the lock won't impact anything.)
                            While holding a lock a multithreaded application may be effectively
                            turned into a single-threaded one. This may be not so important for
                            single-core machines as only one program is running at any given moment
                            anyway. However, in case of multicore machines it may seriosly impact the
                            scaling properties of the application. Of course, this only applies if
                            the performance is important, if multiple threads are used just for a
                            more convenient way of organizing the program flow then one can forget
                            about this.

                            OTOH, a general inter-thread queue is a candidate for a low-level library
                            component, which might be used in very different situations, including
                            ones where the performance is critical. In this regard turning attention
                            to potential performance bottlenecks is important, and in some situations
                            holding a lock unnecessarily long can introduce serious bottlenecks.

                            I agree with you that using a temporary list and splice might be a
                            pessimization. In my code I have resorted to having a std::deque of
                            message objects. The message objects themselves are very cheap to copy or
                            swap, and contain only pointers to larger data structures. If needed, a
                            deep copy of the pointed larger data structure is also done, but only
                            before locking the queue.

                            Regards
                            Paavo

                            Comment

                            • Maxim Yegorushkin

                              #15
                              Re: Request for comments about synchronized queue using boost

                              On Oct 17, 9:12 am, James Kanze <james.ka...@gm ail.comwrote:
                              On Oct 15, 6:02 pm, Maxim Yegorushkin
                              >
                              <maxim.yegorush ...@gmail.comwr ote:
                              On Oct 15, 2:36 pm, Nordlöw <per.nord...@gm ail.comwrote:
                              >
                                  [...]
                              >
                              Apart from that, the mutex is held for too long. You don't
                              really need to hold the lock when allocating memory for
                              elements and when invoking the copy constructor of the
                              elements.
                              >
                              Which sounds a lot like pre-mature optimization to me.  First
                              get the queue working, then see if there is a performance
                              problem, and only then, do something about it.  (Given that
                              clients will normally only call functions on the queue when they
                              have nothing to do, waiting a couple of microseconds longer on
                              the lock won't impact anything.)
                              >
                              Here is an improved version (although a bit simplified):
                                  void push(T const& t)
                              >
                              Actually, I'd name this "send", and not "push".  Just because
                              the standard library uses very poor names doesn't mean we have
                              to.
                              >
                                  {
                                      // this avoids an allocation inside the critical section
                              bellow
                                      queue_type tmp(&t, &t + 1);
                                      {
                                          boost::mutex::s coped_lock l(mtx_);
                                          q_.splice(q_.en d(), tmp);
                                      }
                                      cnd_.notify_one ();
                                  }
                              >
                              This is unnecessary complexity.  And probably looses runtime
                              efficiency (not that it's important): his initial version uses
                              std::deque, which doesn't have to allocate at each
                              insertion---in fact, in all of the uses I've measured, the queue
                              tends to hit its maximum size pretty quickly, and there are no
                              more allocations after that.
                              >
                              Yet another case where premature optimization turns out to be
                              pessimization.
                              >
                                  [...]
                              >
                                 // this function provides only basic exception safety if T's
                                 // copy ctor can throw or strong exception safety if T's copy
                                 // ctor is nothrow
                              >
                              :-).
                              >
                              In practice, I find that almost all of my inter-thread queues
                              need to contain polymorphic objects.  Which means that the queue
                              contains pointers, and that all of the objects will in fact be
                              dynamically allocated.  The result is that I use std::auto_ptr
                              in the interface (so the producer can't access the object once it
                              has been passed off, and the consumer knows to delete it).
                              I agree with you that holding work elements by value is not most
                              practical. boost::function <and std::list<were used only for
                              simplicity here.

                              As you said, in practice, the work units are dynamically allocated
                              polymorphic objects. Naturally, the work unit base class is also a
                              (singly-linked) list node and the queue is implemented as an intrusive
                              list. This way, once a work unit has been allocated, the operations on
                              the inter-thread queue do not involve any memory allocations.

                              Something like this:

                              struct WorkUnit
                              {
                              WorkUnit* next;

                              WorkUnit()
                              : next()
                              {}

                              virtual ~WorkUnit() = 0;
                              virtual void execute() = 0;
                              virtual void release() { delete this; }
                              };

                              template<class T>
                              struct IntrusiveQueue
                              {
                              T *head, **tail;

                              IntrusiveQueue( )
                              : head()
                              , tail(&head)
                              {}

                              void push_back(T* n)
                              {
                              *tail = n;
                              tail = &n->next;
                              }

                              T* pop_front()
                              {
                              T* n = head;
                              if(head && !(head = head->next))
                              tail = &head;
                              return n;
                              }
                              };

                              --
                              Max

                              Comment

                              Working...