MongoDB  2.7.0
message.h
1 // message.h
2 
3 /* Copyright 2009 10gen Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include <vector>
21 
22 #include "mongo/bson/util/atomic_int.h"
23 #include "mongo/util/goodies.h"
24 #include "mongo/util/net/hostandport.h"
25 #include "mongo/util/net/sock.h"
26 
27 namespace mongo {
28 
32  const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
33 
34  class Message;
35  class MessagingPort;
36  class PiggyBackData;
37 
38  typedef AtomicUInt MSGID;
39 
40  enum Operations {
41  opReply = 1, /* reply. responseTo is set. */
42  dbMsg = 1000, /* generic msg command followed by a string */
43  dbUpdate = 2001, /* update object */
44  dbInsert = 2002,
45  //dbGetByOID = 2003,
46  dbQuery = 2004,
47  dbGetMore = 2005,
48  dbDelete = 2006,
49  dbKillCursors = 2007
50  };
51 
52  bool doesOpGetAResponse( int op );
53 
54  inline const char * opToString( int op ) {
55  switch ( op ) {
56  case 0: return "none";
57  case opReply: return "reply";
58  case dbMsg: return "msg";
59  case dbUpdate: return "update";
60  case dbInsert: return "insert";
61  case dbQuery: return "query";
62  case dbGetMore: return "getmore";
63  case dbDelete: return "remove";
64  case dbKillCursors: return "killcursors";
65  default:
66  massert( 16141, str::stream() << "cannot translate opcode " << op, !op );
67  return "";
68  }
69  }
70 
71  inline bool opIsWrite( int op ) {
72  switch ( op ) {
73 
74  case 0:
75  case opReply:
76  case dbMsg:
77  case dbQuery:
78  case dbGetMore:
79  case dbKillCursors:
80  return false;
81 
82  case dbUpdate:
83  case dbInsert:
84  case dbDelete:
85  return true;
86 
87  default:
88  PRINT(op);
89  verify(0);
90  return "";
91  }
92 
93  }
94 
95 #pragma pack(1)
96  /* see http://dochub.mongodb.org/core/mongowireprotocol
97  */
98  struct MSGHEADER {
99  int messageLength; // total message size, including this
100  int requestID; // identifier for this message
101  int responseTo; // requestID from the original request
102  // (used in responses from db)
103  int opCode;
104  };
105 #pragma pack()
106 
107 #pragma pack(1)
108  /* todo merge this with MSGHEADER (or inherit from it). */
109  struct MsgData {
110  int len; /* len of the msg, including this field */
111  MSGID id; /* request/reply id's match... */
112  MSGID responseTo; /* id of the message we are responding to */
113  short _operation;
114  char _flags;
115  char _version;
116  int operation() const {
117  return _operation;
118  }
119  void setOperation(int o) {
120  _flags = 0;
121  _version = 0;
122  _operation = o;
123  }
124  char _data[4];
125 
126  int& dataAsInt() {
127  return *((int *) _data);
128  }
129 
130  bool valid() {
131  if ( len <= 0 || len > ( 4 * BSONObjMaxInternalSize ) )
132  return false;
133  if ( _operation < 0 || _operation > 30000 )
134  return false;
135  return true;
136  }
137 
138  long long getCursor() {
139  verify( responseTo > 0 );
140  verify( _operation == opReply );
141  long long * l = (long long *)(_data + 4);
142  return l[0];
143  }
144 
145  int dataLen(); // len without header
146  };
147  const int MsgDataHeaderSize = sizeof(MsgData) - 4;
148  inline int MsgData::dataLen() {
149  return len - MsgDataHeaderSize;
150  }
151 #pragma pack()
152 
153  class Message {
154  public:
155  // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
156  Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
157  Message( void * data , bool freeIt ) :
158  _buf( 0 ), _data( 0 ), _freeIt( false ) {
159  _setData( reinterpret_cast< MsgData* >( data ), freeIt );
160  };
161  Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
162  *this = r;
163  }
164  ~Message() {
165  reset();
166  }
167 
168  SockAddr _from;
169 
170  MsgData *header() const {
171  verify( !empty() );
172  return _buf ? _buf : reinterpret_cast< MsgData* > ( _data[ 0 ].first );
173  }
174  int operation() const { return header()->operation(); }
175 
176  MsgData *singleData() const {
177  massert( 13273, "single data buffer expected", _buf );
178  return header();
179  }
180 
181  bool empty() const { return !_buf && _data.empty(); }
182 
183  int size() const {
184  int res = 0;
185  if ( _buf ) {
186  res = _buf->len;
187  }
188  else {
189  for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
190  res += it->second;
191  }
192  }
193  return res;
194  }
195 
196  int dataSize() const { return size() - sizeof(MSGHEADER); }
197 
198  // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
199  // can get rid of this if we make response handling smarter
200  void concat() {
201  if ( _buf || empty() ) {
202  return;
203  }
204 
205  verify( _freeIt );
206  int totalSize = 0;
207  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
208  i != _data.end(); ++i) {
209  totalSize += i->second;
210  }
211  char *buf = (char*)malloc( totalSize );
212  char *p = buf;
213  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
214  i != _data.end(); ++i) {
215  memcpy( p, i->first, i->second );
216  p += i->second;
217  }
218  reset();
219  _setData( (MsgData*)buf, true );
220  }
221 
222  // vector swap() so this is fast
223  Message& operator=(Message& r) {
224  verify( empty() );
225  verify( r._freeIt );
226  _buf = r._buf;
227  r._buf = 0;
228  if ( r._data.size() > 0 ) {
229  _data.swap( r._data );
230  }
231  r._freeIt = false;
232  _freeIt = true;
233  return *this;
234  }
235 
236  void reset() {
237  if ( _freeIt ) {
238  if ( _buf ) {
239  free( _buf );
240  }
241  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
242  i != _data.end(); ++i) {
243  free(i->first);
244  }
245  }
246  _buf = 0;
247  _data.clear();
248  _freeIt = false;
249  }
250 
251  // use to add a buffer
252  // assumes message will free everything
253  void appendData(char *d, int size) {
254  if ( size <= 0 ) {
255  return;
256  }
257  if ( empty() ) {
258  MsgData *md = (MsgData*)d;
259  md->len = size; // can be updated later if more buffers added
260  _setData( md, true );
261  return;
262  }
263  verify( _freeIt );
264  if ( _buf ) {
265  _data.push_back(std::make_pair((char*)_buf, _buf->len));
266  _buf = 0;
267  }
268  _data.push_back(std::make_pair(d, size));
269  header()->len += size;
270  }
271 
272  // use to set first buffer if empty
273  void setData(MsgData *d, bool freeIt) {
274  verify( empty() );
275  _setData( d, freeIt );
276  }
277  void setData(int operation, const char *msgtxt) {
278  setData(operation, msgtxt, strlen(msgtxt)+1);
279  }
280  void setData(int operation, const char *msgdata, size_t len) {
281  verify( empty() );
282  size_t dataLen = len + sizeof(MsgData) - 4;
283  MsgData *d = (MsgData *) malloc(dataLen);
284  memcpy(d->_data, msgdata, len);
285  d->len = fixEndian(dataLen);
286  d->setOperation(operation);
287  _setData( d, true );
288  }
289 
290  bool doIFreeIt() {
291  return _freeIt;
292  }
293 
294  void send( MessagingPort &p, const char *context );
295 
296  string toString() const;
297 
298  private:
299  void _setData( MsgData *d, bool freeIt ) {
300  _freeIt = freeIt;
301  _buf = d;
302  }
303  // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
304  MsgData * _buf;
305  // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
306  typedef std::vector< std::pair< char*, int > > MsgVec;
307  MsgVec _data;
308  bool _freeIt;
309  };
310 
311 
312  MSGID nextMessageId();
313 
314 
315 } // namespace mongo
Definition: message_port.h:66
wrapped around os representation of network address
Definition: sock.h:93
Definition: message.h:98
const size_t MaxMessageSizeBytes
Maximum accepted message size on the wire protocol.
Definition: message.h:32
Definition: message.h:109
Definition: message.h:153