MongoDB  2.7.0
queue.h
1 // @file queue.h
2 
3 /* Copyright 2009 10gen Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include "mongo/pch.h"
21 
22 #include <limits>
23 #include <queue>
24 
25 #include <boost/thread/condition.hpp>
26 
27 #include "mongo/util/timer.h"
28 
29 namespace mongo {
30 
31  template <typename T>
32  size_t _getSizeDefault(const T& t) {
33  return 1;
34  }
35 
41  template<typename T>
42  class BlockingQueue : boost::noncopyable {
43  typedef size_t (*getSizeFunc)(const T& t);
44  public:
45  BlockingQueue() :
46  _lock("BlockingQueue"),
47  _maxSize(std::numeric_limits<std::size_t>::max()),
48  _currentSize(0),
49  _getSize(&_getSizeDefault) {}
50  BlockingQueue(size_t size) :
51  _lock("BlockingQueue(bounded)"),
52  _maxSize(size),
53  _currentSize(0),
54  _getSize(&_getSizeDefault) {}
55  BlockingQueue(size_t size, getSizeFunc f) :
56  _lock("BlockingQueue(custom size)"),
57  _maxSize(size),
58  _currentSize(0),
59  _getSize(f) {}
60 
61  void push(T const& t) {
62  scoped_lock l( _lock );
63  size_t tSize = _getSize(t);
64  while (_currentSize + tSize >= _maxSize) {
65  _cvNoLongerFull.wait( l.boost() );
66  }
67  _queue.push( t );
68  _currentSize += tSize;
69  _cvNoLongerEmpty.notify_one();
70  }
71 
72  bool empty() const {
73  scoped_lock l( _lock );
74  return _queue.empty();
75  }
76 
80  size_t size() const {
81  scoped_lock l( _lock );
82  return _currentSize;
83  }
84 
88  size_t maxSize() const {
89  return _maxSize;
90  }
91 
95  int count() const {
96  scoped_lock l( _lock );
97  return _queue.size();
98  }
99 
100  void clear() {
101  scoped_lock l(_lock);
102  _queue = std::queue<T>();
103  _currentSize = 0;
104  }
105 
106  bool tryPop( T & t ) {
107  scoped_lock l( _lock );
108  if ( _queue.empty() )
109  return false;
110 
111  t = _queue.front();
112  _queue.pop();
113  _currentSize -= _getSize(t);
114  _cvNoLongerFull.notify_one();
115 
116  return true;
117  }
118 
119  T blockingPop() {
120 
121  scoped_lock l( _lock );
122  while( _queue.empty() )
123  _cvNoLongerEmpty.wait( l.boost() );
124 
125  T t = _queue.front();
126  _queue.pop();
127  _currentSize -= _getSize(t);
128  _cvNoLongerFull.notify_one();
129 
130  return t;
131  }
132 
133 
139  bool blockingPop( T& t , int maxSecondsToWait ) {
140 
141  Timer timer;
142 
143  boost::xtime xt;
144  boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC);
145  xt.sec += maxSecondsToWait;
146 
147  scoped_lock l( _lock );
148  while( _queue.empty() ) {
149  if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) )
150  return false;
151  }
152 
153  t = _queue.front();
154  _queue.pop();
155  _currentSize -= _getSize(t);
156  _cvNoLongerFull.notify_one();
157  return true;
158  }
159 
160  // Obviously, this should only be used when you have
161  // only one consumer
162  bool blockingPeek(T& t, int maxSecondsToWait) {
163  Timer timer;
164 
165  boost::xtime xt;
166  boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC);
167  xt.sec += maxSecondsToWait;
168 
169  scoped_lock l( _lock );
170  while( _queue.empty() ) {
171  if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) )
172  return false;
173  }
174 
175  t = _queue.front();
176  return true;
177  }
178 
179  // Obviously, this should only be used when you have
180  // only one consumer
181  bool peek(T& t) {
182 
183  scoped_lock l( _lock );
184  if (_queue.empty()) {
185  return false;
186  }
187 
188  t = _queue.front();
189  return true;
190  }
191 
192  private:
193  mutable mongo::mutex _lock;
194  std::queue<T> _queue;
195  const size_t _maxSize;
196  size_t _currentSize;
197  getSizeFunc _getSize;
198 
199  boost::condition _cvNoLongerFull;
200  boost::condition _cvNoLongerEmpty;
201  };
202 
203 }
size_t maxSize() const
The max size for this queue.
Definition: queue.h:88
Time tracking object.
Definition: timer.h:35
Definition: mutex.h:101
bool blockingPop(T &t, int maxSecondsToWait)
blocks waiting for an object until maxSecondsToWait passes if got one, return true and set in t other...
Definition: queue.h:139
int count() const
The number/count of items in the queue ( _queue.size() )
Definition: queue.h:95
On pthread systems, it is an error to destroy a mutex while held (boost mutex may use pthread)...
Definition: mutex.h:74
size_t size() const
The size as measured by the size function.
Definition: queue.h:80
Simple blocking queue with optional max size (by count or custom sizing function).
Definition: queue.h:42