123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152 |
- 'use strict';
- const Transform = require('stream').Transform;
- const PassThrough = require('stream').PassThrough;
- const inherits = require('util').inherits;
- const deprecate = require('util').deprecate;
- const handleCallback = require('./utils').handleCallback;
- const ReadPreference = require('mongodb-core').ReadPreference;
- const MongoError = require('mongodb-core').MongoError;
- const Readable = require('stream').Readable;
- const CoreCursor = require('mongodb-core').Cursor;
- const Map = require('mongodb-core').BSON.Map;
- const executeOperation = require('./utils').executeOperation;
- const count = require('./operations/cursor_ops').count;
- const each = require('./operations/cursor_ops').each;
- const hasNext = require('./operations/cursor_ops').hasNext;
- const next = require('./operations/cursor_ops').next;
- const toArray = require('./operations/cursor_ops').toArray;
- const flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
- const fields = ['numberOfRetries', 'tailableRetryInterval'];
- function Cursor(bson, ns, cmd, options, topology, topologyOptions) {
- CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
- const state = Cursor.INIT;
- const streamOptions = {};
-
- const numberOfRetries = options.numberOfRetries || 5;
- const tailableRetryInterval = options.tailableRetryInterval || 500;
- const currentNumberOfRetries = numberOfRetries;
-
- const promiseLibrary = options.promiseLibrary || Promise;
-
- Readable.call(this, { objectMode: true });
-
- this.s = {
-
- numberOfRetries: numberOfRetries,
- tailableRetryInterval: tailableRetryInterval,
- currentNumberOfRetries: currentNumberOfRetries,
-
- state: state,
-
- streamOptions: streamOptions,
-
- bson: bson,
-
- ns: ns,
-
- cmd: cmd,
-
- options: options,
-
- topology: topology,
-
- topologyOptions: topologyOptions,
-
- promiseLibrary: promiseLibrary,
-
- currentDoc: null,
-
- explicitlyIgnoreSession: options.explicitlyIgnoreSession
- };
-
- if (!options.explicitlyIgnoreSession && options.session) {
- this.s.session = options.session;
- }
-
- if (this.s.options.noCursorTimeout === true) {
- this.addCursorFlag('noCursorTimeout', true);
- }
-
- this.sortValue = this.s.cmd.sort;
-
- const batchSize =
- cmd.cursor && cmd.cursor.batchSize
- ? cmd.cursor && cmd.cursor.batchSize
- : options.cursor && options.cursor.batchSize
- ? options.cursor.batchSize
- : 1000;
-
- this.setCursorBatchSize(batchSize);
- }
- inherits(Cursor, Readable);
- Cursor.prototype._next = function() {
- if (this._initImplicitSession) {
- this._initImplicitSession();
- }
- return CoreCursor.prototype.next.apply(this, arguments);
- };
- for (let name in CoreCursor.prototype) {
- Cursor.prototype[name] = CoreCursor.prototype[name];
- }
- Cursor.prototype._initImplicitSession = function() {
- if (!this.s.explicitlyIgnoreSession && !this.s.session && this.s.topology.hasSessionSupport()) {
- this.s.session = this.s.topology.startSession({ owner: this });
- this.cursorState.session = this.s.session;
- }
- };
- Cursor.prototype._endSession = function() {
- const didCloseCursor = CoreCursor.prototype._endSession.apply(this, arguments);
- if (didCloseCursor) {
- this.s.session = undefined;
- }
- };
- Cursor.prototype.hasNext = function(callback) {
- return executeOperation(this.s.topology, hasNext, [this, callback], {
- skipSessions: true
- });
- };
- Cursor.prototype.next = function(callback) {
- return executeOperation(this.s.topology, next, [this, callback], {
- skipSessions: true
- });
- };
- Cursor.prototype.filter = function(filter) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.query = filter;
- return this;
- };
- Cursor.prototype.maxScan = deprecate(function(maxScan) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.maxScan = maxScan;
- return this;
- }, 'Cursor.maxScan is deprecated, and will be removed in a later version');
- Cursor.prototype.hint = function(hint) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.hint = hint;
- return this;
- };
- Cursor.prototype.min = function(min) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead())
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- this.s.cmd.min = min;
- return this;
- };
- Cursor.prototype.max = function(max) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.max = max;
- return this;
- };
- Cursor.prototype.returnKey = function(value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.returnKey = value;
- return this;
- };
- Cursor.prototype.showRecordId = function(value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.showDiskLoc = value;
- return this;
- };
- Cursor.prototype.snapshot = deprecate(function(value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.snapshot = value;
- return this;
- }, 'Cursor Snapshot is deprecated, and will be removed in a later version');
- Cursor.prototype.setCursorOption = function(field, value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (fields.indexOf(field) === -1) {
- throw MongoError.create({
- message: `option ${field} is not a supported option ${fields}`,
- driver: true
- });
- }
- this.s[field] = value;
- if (field === 'numberOfRetries') this.s.currentNumberOfRetries = value;
- return this;
- };
- Cursor.prototype.addCursorFlag = function(flag, value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (flags.indexOf(flag) === -1) {
- throw MongoError.create({
- message: `flag ${flag} is not a supported flag ${flags}`,
- driver: true
- });
- }
- if (typeof value !== 'boolean') {
- throw MongoError.create({ message: `flag ${flag} must be a boolean value`, driver: true });
- }
- this.s.cmd[flag] = value;
- return this;
- };
- Cursor.prototype.addQueryModifier = function(name, value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (name[0] !== '$') {
- throw MongoError.create({ message: `${name} is not a valid query modifier`, driver: true });
- }
-
- const field = name.substr(1);
-
- this.s.cmd[field] = value;
-
- if (field === 'orderby') this.s.cmd.sort = this.s.cmd[field];
- return this;
- };
- Cursor.prototype.comment = function(value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.comment = value;
- return this;
- };
- Cursor.prototype.maxAwaitTimeMS = function(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxAwaitTimeMS must be a number', driver: true });
- }
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.maxAwaitTimeMS = value;
- return this;
- };
- Cursor.prototype.maxTimeMS = function(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxTimeMS must be a number', driver: true });
- }
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.maxTimeMS = value;
- return this;
- };
- Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
- Cursor.prototype.project = function(value) {
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.s.cmd.fields = value;
- return this;
- };
- Cursor.prototype.sort = function(keyOrList, direction) {
- if (this.s.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support sorting", driver: true });
- }
- if (this.s.state === Cursor.CLOSED || this.s.state === Cursor.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- let order = keyOrList;
-
-
- if (Array.isArray(order) && Array.isArray(order[0])) {
- order = new Map(
- order.map(x => {
- const value = [x[0], null];
- if (x[1] === 'asc') {
- value[1] = 1;
- } else if (x[1] === 'desc') {
- value[1] = -1;
- } else if (x[1] === 1 || x[1] === -1 || x[1].$meta) {
- value[1] = x[1];
- } else {
- throw new MongoError(
- "Illegal sort clause, must be of the form [['field1', '(ascending|descending)'], ['field2', '(ascending|descending)']]"
- );
- }
- return value;
- })
- );
- }
- if (direction != null) {
- order = [[keyOrList, direction]];
- }
- this.s.cmd.sort = order;
- this.sortValue = order;
- return this;
- };
- Cursor.prototype.batchSize = function(value) {
- if (this.s.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support batchSize", driver: true });
- }
- if (this.s.state === Cursor.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
- }
- this.s.cmd.batchSize = value;
- this.setCursorBatchSize(value);
- return this;
- };
- Cursor.prototype.collation = function(value) {
- this.s.cmd.collation = value;
- return this;
- };
- Cursor.prototype.limit = function(value) {
- if (this.s.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support limit", driver: true });
- }
- if (this.s.state === Cursor.OPEN || this.s.state === Cursor.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'limit requires an integer', driver: true });
- }
- this.s.cmd.limit = value;
-
- this.setCursorLimit(value);
- return this;
- };
- Cursor.prototype.skip = function(value) {
- if (this.s.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support skip", driver: true });
- }
- if (this.s.state === Cursor.OPEN || this.s.state === Cursor.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'skip requires an integer', driver: true });
- }
- this.s.cmd.skip = value;
- this.setCursorSkip(value);
- return this;
- };
- Cursor.prototype.each = deprecate(function(callback) {
-
- this.rewind();
-
- this.s.state = Cursor.INIT;
-
- each(this, callback);
- }, 'Cursor.each is deprecated. Use Cursor.forEach instead.');
- Cursor.prototype.forEach = function(iterator, callback) {
-
- this.rewind();
-
- this.s.state = Cursor.INIT;
- if (typeof callback === 'function') {
- each(this, (err, doc) => {
- if (err) {
- callback(err);
- return false;
- }
- if (doc != null) {
- iterator(doc);
- return true;
- }
- if (doc == null && callback) {
- const internalCallback = callback;
- callback = null;
- internalCallback(null);
- return false;
- }
- });
- } else {
- return new this.s.promiseLibrary((fulfill, reject) => {
- each(this, (err, doc) => {
- if (err) {
- reject(err);
- return false;
- } else if (doc == null) {
- fulfill(null);
- return false;
- } else {
- iterator(doc);
- return true;
- }
- });
- });
- }
- };
- Cursor.prototype.setReadPreference = function(readPreference) {
- if (this.s.state !== Cursor.INIT) {
- throw MongoError.create({
- message: 'cannot change cursor readPreference after cursor has been accessed',
- driver: true
- });
- }
- if (readPreference instanceof ReadPreference) {
- this.s.options.readPreference = readPreference;
- } else if (typeof readPreference === 'string') {
- this.s.options.readPreference = new ReadPreference(readPreference);
- } else {
- throw new TypeError('Invalid read preference: ' + readPreference);
- }
- return this;
- };
- Cursor.prototype.toArray = function(callback) {
- if (this.s.options.tailable) {
- throw MongoError.create({
- message: 'Tailable cursor cannot be converted to array',
- driver: true
- });
- }
- return executeOperation(this.s.topology, toArray, [this, callback], {
- skipSessions: true
- });
- };
- Cursor.prototype.count = function(applySkipLimit, opts, callback) {
- if (this.s.cmd.query == null)
- throw MongoError.create({ message: 'count can only be used with find command', driver: true });
- if (typeof opts === 'function') (callback = opts), (opts = {});
- opts = opts || {};
- if (typeof applySkipLimit === 'function') {
- callback = applySkipLimit;
- applySkipLimit = true;
- }
- if (this.s.session) {
- opts = Object.assign({}, opts, { session: this.s.session });
- }
- return executeOperation(this.s.topology, count, [this, applySkipLimit, opts, callback], {
- skipSessions: !!this.s.session
- });
- };
- Cursor.prototype.close = function(options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = Object.assign({}, { skipKillCursors: false }, options);
- this.s.state = Cursor.CLOSED;
- if (!options.skipKillCursors) {
-
- this.kill();
- }
- const completeClose = () => {
-
- this.emit('close');
-
- if (typeof callback === 'function') {
- return handleCallback(callback, null, this);
- }
-
- return new this.s.promiseLibrary(resolve => {
- resolve();
- });
- };
- if (this.s.session) {
- if (typeof callback === 'function') {
- return this._endSession(() => completeClose());
- }
- return new this.s.promiseLibrary(resolve => {
- this._endSession(() => completeClose().then(resolve));
- });
- }
- return completeClose();
- };
- Cursor.prototype.map = function(transform) {
- if (this.cursorState.transforms && this.cursorState.transforms.doc) {
- const oldTransform = this.cursorState.transforms.doc;
- this.cursorState.transforms.doc = doc => {
- return transform(oldTransform(doc));
- };
- } else {
- this.cursorState.transforms = { doc: transform };
- }
- return this;
- };
- Cursor.prototype.isClosed = function() {
- return this.isDead();
- };
- Cursor.prototype.destroy = function(err) {
- if (err) this.emit('error', err);
- this.pause();
- this.close();
- };
- Cursor.prototype.stream = function(options) {
- this.s.streamOptions = options || {};
- return this;
- };
- Cursor.prototype.transformStream = function(options) {
- const streamOptions = options || {};
- if (typeof streamOptions.transform === 'function') {
- const stream = new Transform({
- objectMode: true,
- transform: function(chunk, encoding, callback) {
- this.push(streamOptions.transform(chunk));
- callback();
- }
- });
- return this.pipe(stream);
- }
- return this.pipe(new PassThrough({ objectMode: true }));
- };
- Cursor.prototype.explain = function(callback) {
- this.s.cmd.explain = true;
-
- if (this.s.cmd.readConcern) {
- delete this.s.cmd['readConcern'];
- }
- return executeOperation(this.s.topology, this._next.bind(this), [callback], {
- skipSessions: true
- });
- };
- Cursor.prototype._read = function() {
- if (this.s.state === Cursor.CLOSED || this.isDead()) {
- return this.push(null);
- }
-
- this.next((err, result) => {
- if (err) {
- if (this.listeners('error') && this.listeners('error').length > 0) {
- this.emit('error', err);
- }
- if (!this.isDead()) this.close();
-
- this.emit('end');
- return this.emit('finish');
- }
-
- if (typeof this.s.streamOptions.transform === 'function' && result != null) {
- return this.push(this.s.streamOptions.transform(result));
- }
-
- if (
- this.cursorState.transforms &&
- typeof this.cursorState.transforms.doc === 'function' &&
- result != null
- ) {
- return this.push(this.cursorState.transforms.doc(result));
- }
-
- this.push(result);
- if (result === null && this.isDead()) {
- this.once('end', () => {
- this.close();
- this.emit('finish');
- });
- }
- });
- };
- Cursor.prototype.getLogger = function() {
- return this.logger;
- };
- Object.defineProperty(Cursor.prototype, 'readPreference', {
- enumerable: true,
- get: function() {
- if (!this || !this.s) {
- return null;
- }
- return this.s.options.readPreference;
- }
- });
- Object.defineProperty(Cursor.prototype, 'namespace', {
- enumerable: true,
- get: function() {
- if (!this || !this.s) {
- return null;
- }
-
- const ns = this.s.ns || '';
- const firstDot = ns.indexOf('.');
- if (firstDot < 0) {
- return {
- database: this.s.ns,
- collection: ''
- };
- }
- return {
- database: ns.substr(0, firstDot),
- collection: ns.substr(firstDot + 1)
- };
- }
- });
- Cursor.INIT = 0;
- Cursor.OPEN = 1;
- Cursor.CLOSED = 2;
- Cursor.GET_MORE = 3;
- module.exports = Cursor;
|