OKVIS ROS
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ThreadPool.hpp
Go to the documentation of this file.
1 // Adapted from https://github.com/progschj/ThreadPool on September 3, 2014
2 // Copyright (c) 2015, Autonomous Systems Lab / ETH Zurich
3 
4 // Original copyright:
5 // Copyright (c) 2012 Jakob Progsch
6 //
7 // This software is provided 'as-is', without any express or implied
8 // warranty. In no event will the authors be held liable for any damages
9 // arising from the use of this software.
10 //
11 // Permission is granted to anyone to use this software for any purpose,
12 // including commercial applications, and to alter it and redistribute it
13 // freely, subject to the following restrictions:
14 //
15 // 1. The origin of this software must not be misrepresented; you must not
16 // claim that you wrote the original software. If you use this software
17 // in a product, an acknowledgment in the product documentation would be
18 // appreciated but is not required.
19 //
20 // 2. Altered source versions must be plainly marked as such, and must not be
21 // misrepresented as being the original software.
22 //
23 // 3. This notice may not be removed or altered from any source
24 // distribution.
25 
26 /*
27  * Modified: Stefan Leutenegger (s.leutenegger@imperial.ac.uk)
28  */
29 
38 #ifndef INCLUDE_OKVIS_THREAD_POOL_HPP_
39 #define INCLUDE_OKVIS_THREAD_POOL_HPP_
40 
41 #include <condition_variable>
42 #include <future>
43 #include <functional>
44 #include <memory>
45 #include <mutex>
46 #include <queue>
47 #include <stdexcept>
48 #include <thread>
49 #include <vector>
50 #include <glog/logging.h>
51 
53 namespace okvis {
54 
59 {
60  public:
63  ThreadPool(size_t numThreads);
64 
66  ~ThreadPool();
67 
77  template<class Function, class ... Args>
78  std::future<typename std::result_of<Function(Args...)>::type>
79  enqueue(Function&& function, Args&&... args);
80 
82  void stop()
83  {
84  stop_ = true;
85  }
86 
88  void waitForEmptyQueue() const;
89  private:
91  void run();
93  std::vector<std::thread> workers_;
95  std::queue<std::function<void()>> tasks_;
97  mutable std::mutex tasks_mutex_;
99  mutable std::condition_variable tasks_condition_;
101  mutable std::condition_variable wait_condition_;
103  unsigned active_threads_;
105  volatile bool stop_;
106 };
107 
108 // Enqueue work for the thread pool.
109 template<class Function, class ... Args>
110 std::future<typename std::result_of<Function(Args...)>::type> ThreadPool::enqueue(
111  Function&& function, Args&&... args)
112 {
113  typedef typename std::result_of<Function(Args...)>::type return_type;
114  // Don't allow enqueueing after stopping the pool.
115  if (stop_) {
116  LOG(ERROR)<< "enqueue() called on stopped ThreadPool";
117  // An empty future will return valid() == false.
118  return std::future<typename std::result_of<Function(Args...)>::type>();
119  }
120 
121  auto task = std::make_shared<std::packaged_task<return_type()>>(
122  std::bind(std::forward<Function>(function), std::forward<Args>(args)...));
123 
124  std::future<return_type> res = task->get_future();
125  {
126  std::unique_lock<std::mutex> lock(tasks_mutex_);
127  tasks_.push([task]() {(*task)();});
128  }
129  tasks_condition_.notify_one();
130  return res;
131 }
132 
133 } // namespace okvis
134 
135 #endif // INCLUDE_OKVIS_THREAD_POOL_HPP_
std::condition_variable tasks_condition_
A condition variable for worker threads.
Definition: ThreadPool.hpp:99
volatile bool stop_
A signal to stop the threads.
Definition: ThreadPool.hpp:105
This class manages multiple threads and fills them with work.
Definition: ThreadPool.hpp:58
void stop()
Stop the thread pool. This method is non-blocking.
Definition: ThreadPool.hpp:82
~ThreadPool()
Destructor. This joins all threads.
Definition: ThreadPool.cpp:53
std::condition_variable wait_condition_
A condition variable to support waitForEmptyQueue().
Definition: ThreadPool.hpp:101
ThreadPool(size_t numThreads)
Constructor. Launches some amount of workers.
Definition: ThreadPool.cpp:44
void waitForEmptyQueue() const
This method blocks until the queue is empty.
Definition: ThreadPool.cpp:91
std::queue< std::function< void()> > tasks_
The task queue.
Definition: ThreadPool.hpp:95
std::vector< std::thread > workers_
Need to keep track of threads so we can join them.
Definition: ThreadPool.hpp:93
void run()
Run a single thread.
Definition: ThreadPool.cpp:66
unsigned active_threads_
A counter of active threads.
Definition: ThreadPool.hpp:103
std::mutex tasks_mutex_
A mutex to protect the list of tasks.
Definition: ThreadPool.hpp:97
std::future< typename std::result_of< Function(Args...)>::type > enqueue(Function &&function, Args &&...args)
Enqueue work for the thread pool.
Definition: ThreadPool.hpp:110