void DBClientCursor::commandDataReceived() {
int op = batch.m.operation();
invariant(op == opReply || op == dbCommandReply);
batch.nReturned = 1;
batch.pos = 0;
auto commandReply = rpc::makeReply(&batch.m);
auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply());
if (ErrorCodes::SendStaleConfig == commandStatus) {
throw RecvStaleConfigException("stale config in DBClientCursor::dataReceived()",
commandReply->getCommandReply());
} else if (!commandStatus.isOK()) {
wasError = true;
}
if (_client->getReplyMetadataReader()) {
uassertStatusOK(_client->getReplyMetadataReader()(commandReply->getMetadata(),
_client->getServerAddress()));
}
// HACK: If we got an OP_COMMANDREPLY, take the reply object
// and shove it in to an OP_REPLY message.
if (op == dbCommandReply) {
// Need to take ownership here as we destroy the underlying message.
BSONObj reply = commandReply->getCommandReply().getOwned();
batch.m.reset();
replyToQuery(0, batch.m, reply);
}
QueryResult::View qr = batch.m.singleData().view2ptr();
batch.data = qr.data();
}
/**
* Returns true if request is a query for sharded indexes.
*/
static bool doShardedIndexQuery(OperationContext* txn, Request& r, const QuerySpec& qSpec) {
// Extract the ns field from the query, which may be embedded within the "query" or
// "$query" field.
auto nsField = qSpec.filter()["ns"];
if (nsField.eoo()) {
return false;
}
const NamespaceString indexNSSQuery(nsField.str());
auto status = grid.catalogCache()->getDatabase(txn, indexNSSQuery.db().toString());
if (!status.isOK()) {
return false;
}
shared_ptr<DBConfig> config = status.getValue();
if (!config->isSharded(indexNSSQuery.ns())) {
return false;
}
// if you are querying on system.indexes, we need to make sure we go to a shard
// that actually has chunks. This is not a perfect solution (what if you just
// look at all indexes), but better than doing nothing.
ShardPtr shard;
ChunkManagerPtr cm;
config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard);
if (cm) {
set<ShardId> shardIds;
cm->getAllShardIds(&shardIds);
verify(shardIds.size() > 0);
shard = grid.shardRegistry()->getShard(*shardIds.begin());
}
ShardConnection dbcon(shard->getConnString(), r.getns());
DBClientBase& c = dbcon.conn();
string actualServer;
Message response;
bool ok = c.call(r.m(), response, true, &actualServer);
uassert(10200, "mongos: error calling db", ok);
{
QueryResult::View qr = response.singleData().view2ptr();
if (qr.getResultFlags() & ResultFlag_ShardConfigStale) {
dbcon.done();
// Version is zero b/c this is deprecated codepath
throw RecvStaleConfigException(r.getns(),
"Strategy::doQuery",
ChunkVersion(0, 0, OID()),
ChunkVersion(0, 0, OID()));
}
}
r.reply(response, actualServer.size() ? actualServer : c.getServerAddress());
dbcon.done();
return true;
}
void DBClientCursor::dataReceived(bool& retry, string& host) {
// If this is a reply to our initial command request.
if (_isCommand && cursorId == 0) {
commandDataReceived();
return;
}
QueryResult::View qr = batch.m.singleData().view2ptr();
resultFlags = qr.getResultFlags();
if (qr.getResultFlags() & ResultFlag_ErrSet) {
wasError = true;
}
if (qr.getResultFlags() & ResultFlag_CursorNotFound) {
// cursor id no longer valid at the server.
invariant(qr.getCursorId() == 0);
if (!(opts & QueryOption_CursorTailable)) {
uasserted(13127,
str::stream() << "cursor id " << cursorId << " didn't exist on server.");
}
// 0 indicates no longer valid (dead)
cursorId = 0;
}
if (cursorId == 0 || !(opts & QueryOption_CursorTailable)) {
// only set initially: we don't want to kill it on end of data
// if it's a tailable cursor
cursorId = qr.getCursorId();
}
batch.nReturned = qr.getNReturned();
batch.pos = 0;
batch.data = qr.data();
batch.remainingBytes = qr.dataLen();
_client->checkResponse(batch.data, batch.nReturned, &retry, &host); // watches for "not master"
if (qr.getResultFlags() & ResultFlag_ShardConfigStale) {
BSONObj error;
verify(peekError(&error));
throw RecvStaleConfigException(
(string) "stale config on lazy receive" + causedBy(getErrField(error)), error);
}
/* this assert would fire the way we currently work:
verify( nReturned || cursorId == 0 );
*/
}
virtual void process(Message& m, AbstractMessagingPort* port) {
while (true) {
if (inShutdown()) {
log() << "got request after shutdown()" << endl;
break;
}
DbResponse dbresponse;
{
auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
assembleResponse(opCtx.get(), m, dbresponse, port->remote());
// opCtx must go out of scope here so that the operation cannot show up in currentOp
// results after the response reaches the client
}
if (!dbresponse.response.empty()) {
port->reply(m, dbresponse.response, dbresponse.responseTo);
if (dbresponse.exhaustNS.size() > 0) {
MsgData::View header = dbresponse.response.header();
QueryResult::View qr = header.view2ptr();
long long cursorid = qr.getCursorId();
if (cursorid) {
verify(dbresponse.exhaustNS.size() && dbresponse.exhaustNS[0]);
string ns = dbresponse.exhaustNS; // before reset() free's it...
m.reset();
BufBuilder b(512);
b.appendNum((int)0 /*size set later in appendData()*/);
b.appendNum(header.getId());
b.appendNum(header.getResponseTo());
b.appendNum((int)dbGetMore);
b.appendNum((int)0);
b.appendStr(ns);
b.appendNum((int)0); // ntoreturn
b.appendNum(cursorid);
m.appendData(b.buf(), b.len());
b.decouple();
DEV log() << "exhaust=true sending more";
continue; // this goes back to top loop
}
}
}
break;
}
}
开发者ID:riggery,项目名称:mongo,代码行数:45,代码来源:db.cpp
示例7: invariant
LegacyReply::LegacyReply(const Message* message) : _message(std::move(message)) {
invariant(message->operation() == opReply);
QueryResult::View qr = _message->singleData().view2ptr();
// should be checked by caller.
invariant(qr.msgdata().getNetworkOp() == opReply);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad cursorId field,"
<< " expected a value of 0 but got " << qr.getCursorId(),
qr.getCursorId() == 0);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad nReturned field,"
<< " expected a value of 1 but got " << qr.getNReturned(),
qr.getNReturned() == 1);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad startingFrom field,"
<< " expected a value of 0 but got " << qr.getStartingFrom(),
qr.getStartingFrom() == 0);
std::tie(_commandReply, _metadata) =
uassertStatusOK(rpc::upconvertReplyMetadata(BSONObj(qr.data())));
// Copy the bson array of documents from the message into
// a contiguous area of memory owned by _docBuffer so
// DocumentRange can be used to iterate over documents
auto cursorElem = _commandReply[LegacyReplyBuilder::kCursorTag];
if (cursorElem.eoo())
return;
BSONObj cursorObj = cursorElem.Obj();
auto firstBatchElem = cursorObj[LegacyReplyBuilder::kFirstBatchTag];
if (firstBatchElem.eoo())
return;
for (BSONObjIterator it(firstBatchElem.Obj()); it.more(); it.next()) {
invariant((*it).isABSONObj());
BSONObj doc = (*it).Obj();
doc.appendSelfToBufBuilder(_docBuffer);
}
const char* dataBegin = _docBuffer.buf();
const char* dataEnd = dataBegin + _docBuffer.len();
_outputDocs = DocumentRange(dataBegin, dataEnd);
return;
}
void OpQueryReplyBuilder::putInMessage(
Message* out, int queryResultFlags, int nReturned, int startingFrom, long long cursorId) {
QueryResult::View qr = _buffer.buf();
qr.setResultFlags(queryResultFlags);
qr.msgdata().setLen(_buffer.len());
qr.msgdata().setOperation(opReply);
qr.setCursorId(cursorId);
qr.setStartingFrom(startingFrom);
qr.setNReturned(nReturned);
out->setData(_buffer.release()); // transport will free
}
Message LegacyReplyBuilder::done() {
invariant(_haveCommandReply);
QueryResult::View qr = _builder.buf();
if (_staleConfigError) {
// For compatibility with legacy mongos, we need to set this result flag on StaleConfig
qr.setResultFlags(ResultFlag_ErrSet | ResultFlag_ShardConfigStale);
} else {
qr.setResultFlagsToOk();
}
qr.msgdata().setLen(_builder.len());
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
_message.setData(_builder.release());
return std::move(_message);
}
std::string runQuery(OperationContext* txn,
QueryMessage& q,
const NamespaceString& nss,
CurOp& curop,
Message &result) {
// Validate the namespace.
uassert(16256, str::stream() << "Invalid ns [" << nss.ns() << "]", nss.isValid());
invariant(!nss.isCommand());
// Set curop information.
beginQueryOp(nss, q.query, q.ntoreturn, q.ntoskip, &curop);
// Parse the qm into a CanonicalQuery.
std::auto_ptr<CanonicalQuery> cq;
{
CanonicalQuery* cqRaw;
Status canonStatus = CanonicalQuery::canonicalize(q,
&cqRaw,
WhereCallbackReal(txn, nss.db()));
if (!canonStatus.isOK()) {
uasserted(17287, str::stream() << "Can't canonicalize query: "
<< canonStatus.toString());
}
cq.reset(cqRaw);
}
invariant(cq.get());
LOG(5) << "Running query:\n" << cq->toString();
LOG(2) << "Running query: " << cq->toStringShort();
// Parse, canonicalize, plan, transcribe, and get a plan executor.
AutoGetCollectionForRead ctx(txn, nss);
Collection* collection = ctx.getCollection();
const int dbProfilingLevel = ctx.getDb() ? ctx.getDb()->getProfilingLevel() :
serverGlobalParams.defaultProfile;
// We have a parsed query. Time to get the execution plan for it.
std::unique_ptr<PlanExecutor> exec;
{
PlanExecutor* rawExec;
Status execStatus = getExecutorFind(txn,
collection,
nss,
cq.release(),
PlanExecutor::YIELD_AUTO,
&rawExec);
uassertStatusOK(execStatus);
exec.reset(rawExec);
}
const LiteParsedQuery& pq = exec->getCanonicalQuery()->getParsed();
// If it's actually an explain, do the explain and return rather than falling through
// to the normal query execution loop.
if (pq.isExplain()) {
BufBuilder bb;
bb.skip(sizeof(QueryResult::Value));
BSONObjBuilder explainBob;
Explain::explainStages(exec.get(), ExplainCommon::EXEC_ALL_PLANS, &explainBob);
// Add the resulting object to the return buffer.
BSONObj explainObj = explainBob.obj();
bb.appendBuf((void*)explainObj.objdata(), explainObj.objsize());
// TODO: Does this get overwritten/do we really need to set this twice?
curop.debug().query = q.query;
// Set query result fields.
QueryResult::View qr = bb.buf();
bb.decouple();
qr.setResultFlagsToOk();
qr.msgdata().setLen(bb.len());
curop.debug().responseLength = bb.len();
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
result.setData(qr.view2ptr(), true);
return "";
}
// We freak out later if this changes before we're done with the query.
const ChunkVersion shardingVersionAtStart = shardingState.getVersion(nss.ns());
// Handle query option $maxTimeMS (not used with commands).
curop.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000);
txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
// uassert if we are not on a primary, and not a secondary with SlaveOk query parameter set.
bool slaveOK = pq.isSlaveOk() || pq.hasReadPref();
Status serveReadsStatus = repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(
txn,
nss,
slaveOK);
uassertStatusOK(serveReadsStatus);
// Run the query.
// bb is used to hold query results
// this buffer should contain either requested documents per query or
//.........这里部分代码省略.........
/**
* Called by db/instance.cpp. This is the getMore entry point.
*
* pass - when QueryOption_AwaitData is in use, the caller will make repeated calls
* when this method returns an empty result, incrementing pass on each call.
* Thus, pass == 0 indicates this is the first "attempt" before any 'awaiting'.
*/
QueryResult::View getMore(OperationContext* txn,
const char* ns,
int ntoreturn,
long long cursorid,
CurOp& curop,
int pass,
bool& exhaust,
bool* isCursorAuthorized) {
// For testing, we may want to fail if we receive a getmore.
if (MONGO_FAIL_POINT(failReceivedGetmore)) {
invariant(0);
}
exhaust = false;
const NamespaceString nss(ns);
// Depending on the type of cursor being operated on, we hold locks for the whole getMore,
// or none of the getMore, or part of the getMore. The three cases in detail:
//
// 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
// 2) Cursor owned by global cursor manager: we don't lock anything. These cursors don't
// own any collection state.
// 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
// "unpinCollLock". This is because agg cursors handle locking internally (hence the
// release), but the pin and unpin of the cursor must occur under the collection lock.
// We don't use our AutoGetCollectionForRead "ctx" to relock, because
// AutoGetCollectionForRead checks the sharding version (and we want the relock for the
// unpin to succeed even if the sharding version has changed).
//
// Note that we declare our locks before our ClientCursorPin, in order to ensure that the
// pin's destructor is called before the lock destructors (so that the unpin occurs under
// the lock).
boost::scoped_ptr<AutoGetCollectionForRead> ctx;
boost::scoped_ptr<Lock::DBLock> unpinDBLock;
boost::scoped_ptr<Lock::CollectionLock> unpinCollLock;
CursorManager* cursorManager;
CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager();
if (globalCursorManager->ownsCursorId(cursorid)) {
cursorManager = globalCursorManager;
}
else {
ctx.reset(new AutoGetCollectionForRead(txn, nss));
Collection* collection = ctx->getCollection();
uassert( 17356, "collection dropped between getMore calls", collection );
cursorManager = collection->getCursorManager();
}
LOG(5) << "Running getMore, cursorid: " << cursorid << endl;
// This checks to make sure the operation is allowed on a replicated node. Since we are not
// passing in a query object (necessary to check SlaveOK query option), the only state where
// reads are allowed is PRIMARY (or master in master/slave). This function uasserts if
// reads are not okay.
Status status = repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(
txn,
nss,
true);
uassertStatusOK(status);
// A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it
// doesn't time out. Also informs ClientCursor that there is somebody actively holding the
// CC, so don't delete it.
ClientCursorPin ccPin(cursorManager, cursorid);
ClientCursor* cc = ccPin.c();
// If we're not being called from DBDirectClient we want to associate the RecoveryUnit
// used to create the execution machinery inside the cursor with our OperationContext.
// If we throw or otherwise exit this method in a disorderly fashion, we must ensure
// that further calls to getMore won't fail, and that the provided OperationContext
// has a valid RecoveryUnit. As such, we use RAII to accomplish this.
//
// This must be destroyed before the ClientCursor is destroyed.
std::auto_ptr<ScopedRecoveryUnitSwapper> ruSwapper;
// These are set in the QueryResult msg we return.
int resultFlags = ResultFlag_AwaitCapable;
int numResults = 0;
int startingResult = 0;
const int InitialBufSize =
512 + sizeof(QueryResult::Value) + MaxBytesToReturnToClientAtOnce;
BufBuilder bb(InitialBufSize);
bb.skip(sizeof(QueryResult::Value));
if (NULL == cc) {
cursorid = 0;
resultFlags = ResultFlag_CursorNotFound;
}
//.........这里部分代码省略.........
/**
* Called by db/instance.cpp. This is the getMore entry point.
*/
QueryResult::View getMore(OperationContext* txn,
const char* ns,
int ntoreturn,
long long cursorid,
bool* exhaust,
bool* isCursorAuthorized) {
invariant(ntoreturn >= 0);
CurOp& curop = *CurOp::get(txn);
// For testing, we may want to fail if we receive a getmore.
if (MONGO_FAIL_POINT(failReceivedGetmore)) {
invariant(0);
}
*exhaust = false;
const NamespaceString nss(ns);
// Depending on the type of cursor being operated on, we hold locks for the whole getMore,
// or none of the getMore, or part of the getMore. The three cases in detail:
//
// 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
// 2) Cursor owned by global cursor manager: we don't lock anything. These cursors don't own
// any collection state. These cursors are generated either by the listCollections or
// listIndexes commands, as these special cursor-generating commands operate over catalog
// data rather than targeting the data within a collection.
// 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
// "unpinCollLock". This is because agg cursors handle locking internally (hence the
// release), but the pin and unpin of the cursor must occur under the collection lock.
// We don't use our AutoGetCollectionForRead "ctx" to relock, because
// AutoGetCollectionForRead checks the sharding version (and we want the relock for the
// unpin to succeed even if the sharding version has changed).
//
// Note that we declare our locks before our ClientCursorPin, in order to ensure that the
// pin's destructor is called before the lock destructors (so that the unpin occurs under
// the lock).
unique_ptr<AutoGetCollectionForRead> ctx;
unique_ptr<Lock::DBLock> unpinDBLock;
unique_ptr<Lock::CollectionLock> unpinCollLock;
CursorManager* cursorManager;
if (nss.isListIndexesCursorNS() || nss.isListCollectionsCursorNS()) {
// List collections and list indexes are special cursor-generating commands whose
// cursors are managed globally, as they operate over catalog data rather than targeting
// the data within a collection.
cursorManager = CursorManager::getGlobalCursorManager();
} else {
ctx = stdx::make_unique<AutoGetCollectionForRead>(txn, nss);
Collection* collection = ctx->getCollection();
uassert(17356, "collection dropped between getMore calls", collection);
cursorManager = collection->getCursorManager();
}
LOG(5) << "Running getMore, cursorid: " << cursorid << endl;
// This checks to make sure the operation is allowed on a replicated node. Since we are not
// passing in a query object (necessary to check SlaveOK query option), the only state where
// reads are allowed is PRIMARY (or master in master/slave). This function uasserts if
// reads are not okay.
Status status = repl::getGlobalReplicationCoordinator()->checkCanServeReadsFor(txn, nss, true);
uassertStatusOK(status);
// A pin performs a CC lookup and if there is a CC, increments the CC's pin value so it
// doesn't time out. Also informs ClientCursor that there is somebody actively holding the
// CC, so don't delete it.
ClientCursorPin ccPin(cursorManager, cursorid);
ClientCursor* cc = ccPin.c();
// These are set in the QueryResult msg we return.
int resultFlags = ResultFlag_AwaitCapable;
int numResults = 0;
int startingResult = 0;
const int InitialBufSize =
512 + sizeof(QueryResult::Value) + FindCommon::kMaxBytesToReturnToClientAtOnce;
BufBuilder bb(InitialBufSize);
bb.skip(sizeof(QueryResult::Value));
if (NULL == cc) {
cursorid = 0;
resultFlags = ResultFlag_CursorNotFound;
} else {
// Check for spoofing of the ns such that it does not match the one originally
// there for the cursor.
uassert(ErrorCodes::Unauthorized,
str::stream() << "Requested getMore on namespace " << ns << ", but cursor "
<< cursorid << " belongs to namespace " << cc->ns(),
ns == cc->ns());
*isCursorAuthorized = true;
if (cc->isReadCommitted())
uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
// Reset timeout timer on the cursor since the cursor is still in use.
cc->setIdleTime(0);
//.........这里部分代码省略.........
LegacyReply::LegacyReply(const Message* message) {
invariant(message->operation() == opReply);
QueryResult::View qr = message->singleData().view2ptr();
// should be checked by caller.
invariant(qr.msgdata().getNetworkOp() == opReply);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad cursorId field,"
<< " expected a value of 0 but got "
<< qr.getCursorId(),
qr.getCursorId() == 0);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad nReturned field,"
<< " expected a value of 1 but got "
<< qr.getNReturned(),
qr.getNReturned() == 1);
uassert(ErrorCodes::BadValue,
str::stream() << "Got legacy command reply with a bad startingFrom field,"
<< " expected a value of 0 but got "
<< qr.getStartingFrom(),
qr.getStartingFrom() == 0);
auto status = Validator<BSONObj>::validateLoad(qr.data(), qr.dataLen());
uassert(ErrorCodes::InvalidBSON,
str::stream() << "Got legacy command reply with invalid BSON in the metadata field"
<< causedBy(status),
status.isOK());
_commandReply = BSONObj(qr.data());
_commandReply.shareOwnershipWith(message->sharedBuffer());
if (_commandReply.firstElementFieldName() == "$err"_sd) {
// Upconvert legacy errors.
BSONObjBuilder bob;
bob.appendAs(_commandReply.firstElement(), "errmsg");
bob.append("ok", 0.0);
if (auto code = _commandReply["code"]) {
bob.append(code);
}
_commandReply = bob.obj();
}
return;
}
std::string newRunQuery(OperationContext* txn,
Message& m,
QueryMessage& q,
CurOp& curop,
Message &result,
bool fromDBDirectClient) {
// Validate the namespace.
const char *ns = q.ns;
uassert(16332, "can't have an empty ns", ns[0]);
const NamespaceString nsString(ns);
uassert(16256, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid());
// Set curop information.
curop.debug().ns = ns;
curop.debug().ntoreturn = q.ntoreturn;
curop.debug().query = q.query;
curop.setQuery(q.query);
// If the query is really a command, run it.
if (nsString.isCommand()) {
int nToReturn = q.ntoreturn;
uassert(16979, str::stream() << "bad numberToReturn (" << nToReturn
<< ") for $cmd type ns - can only be 1 or -1",
nToReturn == 1 || nToReturn == -1);
curop.markCommand();
BufBuilder bb;
bb.skip(sizeof(QueryResult::Value));
BSONObjBuilder cmdResBuf;
if (!runCommands(txn, ns, q.query, curop, bb, cmdResBuf, false, q.queryOptions)) {
uasserted(13530, "bad or malformed command request?");
}
curop.debug().iscommand = true;
// TODO: Does this get overwritten/do we really need to set this twice?
curop.debug().query = q.query;
QueryResult::View qr = bb.buf();
bb.decouple();
qr.setResultFlagsToOk();
qr.msgdata().setLen(bb.len());
curop.debug().responseLength = bb.len();
qr.msgdata().setOperation(opReply);
qr.setCursorId(0);
qr.setStartingFrom(0);
qr.setNReturned(1);
result.setData(qr.view2ptr(), true);
return "";
}
const NamespaceString nss(q.ns);
// Parse the qm into a CanonicalQuery.
CanonicalQuery* cq;
Status canonStatus = CanonicalQuery::canonicalize(
q, &cq, WhereCallbackReal(txn, StringData(nss.db())));
if (!canonStatus.isOK()) {
uasserted(17287, str::stream() << "Can't canonicalize query: " << canonStatus.toString());
}
QLOG() << "Running query:\n" << cq->toString();
LOG(2) << "Running query: " << cq->toStringShort();
// Parse, canonicalize, plan, transcribe, and get a plan executor.
PlanExecutor* rawExec = NULL;
// We use this a lot below.
const LiteParsedQuery& pq = cq->getParsed();
AutoGetCollectionForRead ctx(txn, nss);
const int dbProfilingLevel = (ctx.getDb() != NULL) ? ctx.getDb()->getProfilingLevel() :
serverGlobalParams.defaultProfile;
Collection* collection = ctx.getCollection();
// We'll now try to get the query executor that will execute this query for us. There
// are a few cases in which we know upfront which executor we should get and, therefore,
// we shortcut the selection process here.
//
// (a) If the query is over a collection that doesn't exist, we use an EOFStage.
//
// (b) if the query is a replication's initial sync one, we use a specifically designed
// stage that skips extents faster (see details in exec/oplogstart.h).
//
// Otherwise we go through the selection of which executor is most suited to the
// query + run-time context at hand.
Status status = Status::OK();
if (NULL != collection && pq.getOptions().oplogReplay) {
// Takes ownership of 'cq'.
status = getOplogStartHack(txn, collection, cq, &rawExec);
}
else {
size_t options = QueryPlannerParams::DEFAULT;
if (shardingState.needCollectionMetadata(pq.ns())) {
options |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
//.........这里部分代码省略.........
请发表评论