MongoDB  2.7.0
parallel.h
1 // parallel.h
2 
3 /* Copyright 2009 10gen Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
22 #pragma once
23 
24 #include "mongo/client/export_macros.h"
25 #include "mongo/db/dbmessage.h"
26 #include "mongo/db/matcher.h"
27 #include "mongo/db/namespace_string.h"
28 #include "mongo/s/shard.h"
29 #include "mongo/s/stale_exception.h" // for StaleConfigException
30 #include "mongo/util/concurrency/mvar.h"
31 
32 namespace mongo {
33 
37  class MONGO_CLIENT_API ServerAndQuery {
38  public:
39  ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
40  _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) {
41  }
42 
43  bool operator<( const ServerAndQuery& other ) const {
44  if ( ! _orderObject.isEmpty() )
45  return _orderObject.woCompare( other._orderObject ) < 0;
46 
47  if ( _server < other._server )
48  return true;
49  if ( other._server > _server )
50  return false;
51  return _extra.woCompare( other._extra ) < 0;
52  }
53 
54  string toString() const {
55  StringBuilder ss;
56  ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
57  return ss.str();
58  }
59 
60  operator string() const {
61  return toString();
62  }
63 
64  string _server;
65  BSONObj _extra;
66  BSONObj _orderObject;
67  };
68 
71 
72  class MONGO_CLIENT_API CommandInfo {
73  public:
74  string versionedNS;
75  BSONObj cmdFilter;
76 
77  CommandInfo() {}
78  CommandInfo( const string& vns, const BSONObj& filter ) : versionedNS( vns ), cmdFilter( filter ) {}
79 
80  bool isEmpty(){
81  return versionedNS.size() == 0;
82  }
83 
84  string toString() const {
85  return str::stream() << "CInfo " << BSON( "v_ns" << versionedNS << "filter" << cmdFilter );
86  }
87  };
88 
89  typedef shared_ptr<ShardConnection> ShardConnectionPtr;
90 
91  class DBClientCursor;
92  typedef shared_ptr<DBClientCursor> DBClientCursorPtr;
93 
94  class MONGO_CLIENT_API ParallelConnectionState {
95  public:
96 
98  count( 0 ), done( false ) { }
99 
100  ShardConnectionPtr conn;
101  DBClientCursorPtr cursor;
102 
103  // Version information
104  ChunkManagerPtr manager;
105  ShardPtr primary;
106 
107  // Cursor status information
108  long long count;
109  bool done;
110 
111  BSONObj toBSON() const;
112 
113  string toString() const {
114  return str::stream() << "PCState : " << toBSON();
115  }
116  };
117 
119  typedef shared_ptr<PCState> PCStatePtr;
120 
121  class MONGO_CLIENT_API ParallelConnectionMetadata {
122  public:
123 
125  retryNext( false ), initialized( false ), finished( false ), completed( false ), errored( false ) { }
126 
128  cleanup( true );
129  }
130 
131  void cleanup( bool full = true );
132 
133  PCStatePtr pcState;
134 
135  bool retryNext;
136 
137  bool initialized;
138  bool finished;
139  bool completed;
140 
141  bool errored;
142 
143  BSONObj toBSON() const;
144 
145  string toString() const {
146  return str::stream() << "PCMData : " << toBSON();
147  }
148  };
149 
151  typedef shared_ptr<PCMData> PCMDataPtr;
152 
165  class MONGO_CLIENT_API ParallelSortClusteredCursor {
166  public:
167 
168  ParallelSortClusteredCursor( const QuerySpec& qSpec, const CommandInfo& cInfo = CommandInfo() );
169 
170  // DEPRECATED legacy constructor for pure mergesort functionality - do not use
171  ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
172  const Query& q , int options=0, const BSONObj& fields=BSONObj() );
173 
175 
176  std::string getNS();
177 
179  void init();
180 
181  bool more();
182  BSONObj next();
183  string type() const { return "ParallelSort"; }
184 
185  void fullInit();
186  void startInit();
187  void finishInit();
188 
189  bool isCommand(){ return NamespaceString( _qSpec.ns() ).isCommand(); }
190  bool isExplain(){ return _qSpec.isExplain(); }
191  bool isVersioned(){ return _qShards.size() == 0; }
192 
193  bool isSharded();
194  ShardPtr getPrimary();
195  void getQueryShards( set<Shard>& shards );
196  ChunkManagerPtr getChunkManager( const Shard& shard );
197  DBClientCursorPtr getShardCursor( const Shard& shard );
198 
199  BSONObj toBSON() const;
200  string toString() const;
201 
202  void explain(BSONObjBuilder& b);
203 
204  private:
205  void _finishCons();
206 
207  void _explain( map< string,list<BSONObj> >& out );
208 
209  void _markStaleNS( const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload, bool& fullReload );
210  void _handleStaleNS( const NamespaceString& staleNS, bool forceReload, bool fullReload );
211 
212  bool _didInit;
213  bool _done;
214 
215  set<Shard> _qShards;
216  QuerySpec _qSpec;
217  CommandInfo _cInfo;
218 
219  // Count round-trips req'd for namespaces and total
220  map<string,int> _staleNSMap;
221  int _totalTries;
222 
223  map<Shard,PCMData> _cursorMap;
224 
225  // LEGACY BELOW
226  int _numServers;
227  int _lastFrom;
228  set<ServerAndQuery> _servers;
229  BSONObj _sortKey;
230 
231  FilteringClientCursor * _cursors;
232  int _needToSkip;
233 
239  void setupVersionAndHandleSlaveOk( PCStatePtr state /* in & out */,
240  const Shard& shard,
241  ShardPtr primary /* in */,
242  const NamespaceString& ns,
243  const std::string& vinfo,
244  ChunkManagerPtr manager /* in */ );
245 
246  // LEGACY init - Needed for map reduce
247  void _oldInit();
248 
249  // LEGACY - Needed ONLY for _oldInit
250  string _ns;
251  BSONObj _query;
252  int _options;
253  BSONObj _fields;
254  int _batchSize;
255  };
256 
257 
258  // TODO: We probably don't really need this as a separate class.
259  class MONGO_CLIENT_API FilteringClientCursor {
260  public:
263 
264  void reset( auto_ptr<DBClientCursor> cursor );
265  void reset( DBClientCursor* cursor, ParallelConnectionMetadata* _pcmData = NULL );
266 
267  bool more();
268  BSONObj next();
269 
270  BSONObj peek();
271 
272  DBClientCursor* raw() { return _cursor.get(); }
273  ParallelConnectionMetadata* rawMData(){ return _pcmData; }
274 
275  // Required for new PCursor
276  void release(){
277  _cursor.release();
278  _pcmData = NULL;
279  }
280 
281  private:
282  void _advance();
283 
284  auto_ptr<DBClientCursor> _cursor;
285  ParallelConnectionMetadata* _pcmData;
286 
287  BSONObj _next;
288  bool _done;
289  };
290 
299  class MONGO_CLIENT_API Future {
300  public:
302  public:
303 
304  string getServer() const { return _server; }
305 
306  bool isDone() const { return _done; }
307 
308  bool ok() const {
309  verify( _done );
310  return _ok;
311  }
312 
313  BSONObj result() const {
314  verify( _done );
315  return _res;
316  }
317 
322  bool join( int maxRetries = 1 );
323 
324  private:
325 
326  CommandResult( const string& server,
327  const string& db,
328  const BSONObj& cmd,
329  int options,
330  DBClientBase * conn,
331  bool useShardedConn );
332  void init();
333 
334  string _server;
335  string _db;
336  int _options;
337  BSONObj _cmd;
338  DBClientBase * _conn;
339  scoped_ptr<AScopedConnection> _connHolder; // used if not provided a connection
340  bool _useShardConn;
341 
342  scoped_ptr<DBClientCursor> _cursor;
343 
344  BSONObj _res;
345  bool _ok;
346  bool _done;
347 
348  friend class Future;
349  };
350 
351 
359  static shared_ptr<CommandResult> spawnCommand( const string& server,
360  const string& db,
361  const BSONObj& cmd,
362  int options,
363  DBClientBase * conn = 0,
364  bool useShardConn = false );
365  };
366 
367 
368 }
369 
Utility for creating a BSONObj.
Definition: bsonobjbuilder.h:52
Runs a query in parallel across N servers, enforcing compatible chunk versions for queries across all...
Definition: parallel.h:165
LogstreamBuilder out()
Synonym for log().
Definition: log.h:79
the idea here is to make one liners easy.
Definition: str.h:48
Generally clients should be using Strategy::commandOp() wherever possible - the Future API does not h...
Definition: parallel.h:299
Represents a full query description, including all options required for the query to be passed on to ...
Definition: dbclientinterface.h:472
abstract class that implements the core db operations
Definition: dbclientinterface.h:1058
Definition: parallel.h:301
Definition: parallel.h:121
Queries return a cursor object.
Definition: dbclientcursor.h:48
holder for a server address and a query to run
Definition: parallel.h:37
stringstream deals with locale so this is a lot faster than std::stringstream for UTF8 ...
Definition: builder.h:61
C++ representation of a "BSON" object – that is, an extended JSON-style object in a binary representa...
Definition: bsonobj.h:77
Represents a Mongo query expression.
Definition: dbclientinterface.h:350
Definition: parallel.h:259
Definition: parallel.h:94
Definition: parallel.h:72