OKVIS ROS
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ThreadsafeQueue.hpp
Go to the documentation of this file.
1 /*********************************************************************************
2  * OKVIS - Open Keyframe-based Visual-Inertial SLAM
3  * Copyright (c) 2015, Autonomous Systems Lab / ETH Zurich
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * * Redistributions of source code must retain the above copyright notice,
9  * this list of conditions and the following disclaimer.
10  * * Redistributions in binary form must reproduce the above copyright notice,
11  * this list of conditions and the following disclaimer in the documentation
12  * and/or other materials provided with the distribution.
13  * * Neither the name of Autonomous Systems Lab / ETH Zurich nor the names of
14  * its contributors may be used to endorse or promote products derived from
15  * this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * Created on: 2013
30  * Author: Simon Lynen
31  * Modified: Stefan Leutenegger (s.leutenegger@imperial.ac.uk)
32  *********************************************************************************/
33 
41 #ifndef INCLUDE_OKVIS_THREADSAFE_THREADSAFEQUEUE_HPP_
42 #define INCLUDE_OKVIS_THREADSAFE_THREADSAFEQUEUE_HPP_
43 
44 #include <atomic>
45 #include <pthread.h>
46 #include <queue>
47 #include <string>
48 #include <sys/time.h>
49 
50 #include <glog/logging.h>
51 
53 namespace okvis {
54 
56 namespace threadsafe {
57 
59  public:
60  ThreadSafeQueueBase() = default;
61  virtual ~ThreadSafeQueueBase() {}
62  virtual void NotifyAll() const = 0;
63  virtual void Shutdown() = 0;
64  virtual void Resume() = 0;
65  virtual size_t Size() const = 0;
66  virtual bool Empty() const = 0;
67 };
68 
73 template<typename QueueType>
75  friend bool test_funcs(void* (*)(void*), void* (*)(void*), // NOLINT
76  const std::string&, bool);
77 
78  public:
79 
81  virtual void NotifyAll() const final {
82  pthread_cond_broadcast(&condition_empty_);
83  pthread_cond_broadcast(&condition_full_);
84  }
85 
88  shutdown_ = false;
89  pthread_mutex_init(&mutex_, NULL);
90  pthread_cond_init(&condition_empty_, NULL);
91  pthread_cond_init(&condition_full_, NULL);
92  }
93 
95  virtual ~ThreadSafeQueue() {
96  shutdown_ = true;
97  NotifyAll();
98  pthread_mutex_destroy(&mutex_);
99  pthread_cond_destroy(&condition_empty_);
100  pthread_cond_destroy(&condition_full_);
101  }
102 
104  virtual void Shutdown() final {
105  shutdown_ = true;
106  NotifyAll();
107  }
108 
110  virtual void Resume() final {
111  shutdown_ = false;
112  NotifyAll();
113  }
114 
116  void Push(const QueueType& value) {
117  PushNonBlocking(value);
118  }
119 
121  void PushNonBlocking(const QueueType& value) {
122  pthread_mutex_lock(&mutex_);
123  queue_.push(value);
124  pthread_cond_signal(&condition_empty_); // Signal that data is available.
125  pthread_mutex_unlock(&mutex_);
126  }
127 
129  virtual size_t Size() const final {
130  pthread_mutex_lock(&mutex_);
131  size_t size = queue_.size();
132  pthread_mutex_unlock(&mutex_);
133  return size;
134  }
135 
137  virtual bool Empty() const final {
138  pthread_mutex_lock(&mutex_);
139  bool empty = queue_.empty();
140  pthread_mutex_unlock(&mutex_);
141  return empty;
142  }
143 
148  bool PushBlockingIfFull(const QueueType& value, size_t max_queue_size) {
149  while (!shutdown_) {
150  pthread_mutex_lock(&mutex_);
151  size_t size = queue_.size();
152  if (size >= max_queue_size) {
153  pthread_cond_wait(&condition_full_, &mutex_);
154  }
155  if (size >= max_queue_size) {
156  pthread_mutex_unlock(&mutex_);
157  continue;
158  }
159  queue_.push(value);
160  pthread_cond_signal(&condition_empty_); // Signal that data is available.
161  pthread_mutex_unlock(&mutex_);
162  return true;
163  }
164  return false;
165  }
166 
171  bool PushNonBlockingDroppingIfFull(const QueueType& value,
172  size_t max_queue_size) {
173  pthread_mutex_lock(&mutex_);
174  bool result = false;
175  if (queue_.size() >= max_queue_size) {
176  queue_.pop();
177  result = true;
178  }
179  queue_.push(value);
180  pthread_cond_signal(&condition_empty_); // Signal that data is available.
181  pthread_mutex_unlock(&mutex_);
182  return result;
183  }
184 
190  bool Pop(QueueType* value) {
191  return PopBlocking(value);
192  }
193 
199  bool PopBlocking(QueueType* value) {
200  CHECK_NOTNULL(value);
201  while (!shutdown_) {
202  pthread_mutex_lock(&mutex_);
203  if (queue_.empty()) {
204  pthread_cond_wait(&condition_empty_, &mutex_);
205  }
206  if (queue_.empty()) {
207  pthread_mutex_unlock(&mutex_);
208  continue;
209  }
210  QueueType _value = queue_.front();
211  queue_.pop();
212  pthread_cond_signal(&condition_full_); // Notify that space is available.
213  pthread_mutex_unlock(&mutex_);
214  *value = _value;
215  return true;
216  }
217  return false;
218  }
219 
225  bool PopNonBlocking(QueueType* value) {
226  CHECK_NOTNULL(value);
227  pthread_mutex_lock(&mutex_);
228  if (queue_.empty()) {
229  pthread_mutex_unlock(&mutex_);
230  return false;
231  }
232  *value = queue_.front();
233  queue_.pop();
234  pthread_mutex_unlock(&mutex_);
235  return true;
236  }
237 
247  bool PopTimeout(QueueType* value, int64_t timeout_nanoseconds) {
248  CHECK_NOTNULL(value);
249  pthread_mutex_lock(&mutex_);
250  if (queue_.empty()) {
251  struct timeval tv;
252  struct timespec ts;
253  gettimeofday(&tv, NULL);
254  ts.tv_sec = tv.tv_sec;
255  ts.tv_nsec = tv.tv_usec * 1e3 + timeout_nanoseconds;
256  pthread_cond_timedwait(&condition_empty_, &mutex_, &ts);
257  }
258  if (queue_.empty()) {
259  pthread_mutex_unlock(&mutex_);
260  return false;
261  }
262  QueueType _value = queue_.front();
263  queue_.pop();
264  pthread_cond_signal(&condition_full_); // Notify that space is available.
265  pthread_mutex_unlock(&mutex_);
266  *value = _value;
267  return true;
268  }
269 
277  bool getCopyOfFront(QueueType* value) {
278  CHECK_NOTNULL(value);
279  pthread_mutex_lock(&mutex_);
280  if (queue_.empty()) {
281  pthread_mutex_unlock(&mutex_);
282  return false;
283  }
284  // COPY the value.
285  *value = queue_.front();
286  pthread_mutex_unlock(&mutex_);
287  return true;
288  }
289 
296  bool getCopyOfFrontBlocking(QueueType* value) {
297  CHECK_NOTNULL(value);
298  while (!shutdown_) {
299  pthread_mutex_lock(&mutex_);
300  if (queue_.empty()) {
301  pthread_cond_wait(&condition_empty_, &mutex_);
302  }
303  if (queue_.empty()) {
304  pthread_mutex_unlock(&mutex_);
305  continue;
306  }
307  *value = queue_.front();
308  pthread_mutex_unlock(&mutex_);
309  return true;
310  }
311  return false;
312  }
313 
321  bool getCopyOfBack(QueueType* value) {
322  CHECK_NOTNULL(value);
323  pthread_mutex_lock(&mutex_);
324  if (queue_.empty()) {
325  pthread_mutex_unlock(&mutex_);
326  return false;
327  }
328  // COPY the value.
329  *value = queue_.back();
330  pthread_mutex_unlock(&mutex_);
331  return true;
332  }
333 
334 
335  mutable pthread_mutex_t mutex_;
336  mutable pthread_cond_t condition_empty_;
337  mutable pthread_cond_t condition_full_;
338  std::queue<QueueType> queue_;
339  std::atomic_bool shutdown_;
340 
341 };
342 
343 } // namespace threadsafe
344 
345 } // namespace okvis
346 
347 #endif // INCLUDE_OKVIS_THREADSAFE_THREADSAFEQUEUE_HPP_
ThreadSafeQueue()
Constructor.
Definition: ThreadsafeQueue.hpp:87
virtual ~ThreadSafeQueue()
Destructor.
Definition: ThreadsafeQueue.hpp:95
pthread_mutex_t mutex_
The queue mutex.
Definition: ThreadsafeQueue.hpp:335
bool PushBlockingIfFull(const QueueType &value, size_t max_queue_size)
Push to the queue if the size is less than max_queue_size, else block.
Definition: ThreadsafeQueue.hpp:148
bool PushNonBlockingDroppingIfFull(const QueueType &value, size_t max_queue_size)
Push to the queue. If full, drop the oldest entry.
Definition: ThreadsafeQueue.hpp:171
void PushNonBlocking(const QueueType &value)
Push to the queue.
Definition: ThreadsafeQueue.hpp:121
bool PopBlocking(QueueType *value)
Get the oldest entry still in the queue. Blocking if queue is empty.
Definition: ThreadsafeQueue.hpp:199
virtual void Shutdown() final
Tell the queue shut down. This will notify all threads to wake up.
Definition: ThreadsafeQueue.hpp:104
virtual ~ThreadSafeQueueBase()
Definition: ThreadsafeQueue.hpp:61
bool PopNonBlocking(QueueType *value)
Get the oldest entry still in the queue. If queue is empty value is not altered.
Definition: ThreadsafeQueue.hpp:225
virtual bool Empty() const final
Return true if the queue is empty.
Definition: ThreadsafeQueue.hpp:137
pthread_cond_t condition_empty_
Condition variable to wait and signal that queue is not empty.
Definition: ThreadsafeQueue.hpp:336
void Push(const QueueType &value)
Push non-blocking to the queue.
Definition: ThreadsafeQueue.hpp:116
virtual void NotifyAll() const final
Notify all waiting threads. Only used in destructor and when shutting down.
Definition: ThreadsafeQueue.hpp:81
bool getCopyOfFront(QueueType *value)
Get a copy of the front / oldest element in the queue. If queue is empty value is not altered...
Definition: ThreadsafeQueue.hpp:277
bool getCopyOfFrontBlocking(QueueType *value)
Get a copy of the front / oldest element in the queue. Blocking if queue is empty. The queue itself is not changed, i.e. the returned element is still in the queue.
Definition: ThreadsafeQueue.hpp:296
virtual void NotifyAll() const =0
virtual void Resume() final
Tell the queue to resume after a shutdown request.
Definition: ThreadsafeQueue.hpp:110
Definition: ThreadsafeQueue.hpp:58
bool PopTimeout(QueueType *value, int64_t timeout_nanoseconds)
Get the oldest entry still in the queue. If the queue is empty wait for a given amount of time...
Definition: ThreadsafeQueue.hpp:247
Class that implements a threadsafe FIFO queue.
Definition: ThreadsafeQueue.hpp:74
bool getCopyOfBack(QueueType *value)
Get a copy of the back / newest element in the queue. If queue is empty value is not altered...
Definition: ThreadsafeQueue.hpp:321
friend bool test_funcs(void *(*)(void *), void *(*)(void *), const std::string &, bool)
bool Pop(QueueType *value)
Get the oldest entry still in the queue. Blocking if queue is empty.
Definition: ThreadsafeQueue.hpp:190
std::atomic_bool shutdown_
Flag if shutdown is requested.
Definition: ThreadsafeQueue.hpp:339
virtual size_t Size() const final
Return the size of the queue.
Definition: ThreadsafeQueue.hpp:129
virtual size_t Size() const =0
pthread_cond_t condition_full_
Condition variable to wait and signal when an element is popped.
Definition: ThreadsafeQueue.hpp:337
std::queue< QueueType > queue_
Actual queue.
Definition: ThreadsafeQueue.hpp:338