123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- 'use strict';
- var stream = require('stream'),
- util = require('util');
- module.exports = GridFSBucketReadStream;
- function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
- this.s = {
- bytesRead: 0,
- chunks: chunks,
- cursor: null,
- expected: 0,
- files: files,
- filter: filter,
- init: false,
- expectedEnd: 0,
- file: null,
- options: options,
- readPreference: readPreference
- };
- stream.Readable.call(this);
- }
- util.inherits(GridFSBucketReadStream, stream.Readable);
- GridFSBucketReadStream.prototype._read = function() {
- var _this = this;
- if (this.destroyed) {
- return;
- }
- waitForFile(_this, function() {
- doRead(_this);
- });
- };
- GridFSBucketReadStream.prototype.start = function(start) {
- throwIfInitialized(this);
- this.s.options.start = start;
- return this;
- };
- GridFSBucketReadStream.prototype.end = function(end) {
- throwIfInitialized(this);
- this.s.options.end = end;
- return this;
- };
- GridFSBucketReadStream.prototype.abort = function(callback) {
- var _this = this;
- this.push(null);
- this.destroyed = true;
- if (this.s.cursor) {
- this.s.cursor.close(function(error) {
- _this.emit('close');
- callback && callback(error);
- });
- } else {
- if (!this.s.init) {
-
-
- _this.emit('close');
- }
- callback && callback();
- }
- };
- function throwIfInitialized(self) {
- if (self.s.init) {
- throw new Error('You cannot change options after the stream has entered' + 'flowing mode!');
- }
- }
- function doRead(_this) {
- if (_this.destroyed) {
- return;
- }
- _this.s.cursor.next(function(error, doc) {
- if (_this.destroyed) {
- return;
- }
- if (error) {
- return __handleError(_this, error);
- }
- if (!doc) {
- _this.push(null);
- return _this.s.cursor.close(function(error) {
- if (error) {
- return __handleError(_this, error);
- }
- _this.emit('close');
- });
- }
- var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
- var expectedN = _this.s.expected++;
- var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining);
- if (doc.n > expectedN) {
- var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
- return __handleError(_this, new Error(errmsg));
- }
- if (doc.n < expectedN) {
- errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
- return __handleError(_this, new Error(errmsg));
- }
- var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
- if (buf.length !== expectedLength) {
- if (bytesRemaining <= 0) {
- errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
- return __handleError(_this, new Error(errmsg));
- }
- errmsg =
- 'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength;
- return __handleError(_this, new Error(errmsg));
- }
- _this.s.bytesRead += buf.length;
- if (buf.length === 0) {
- return _this.push(null);
- }
- var sliceStart = null;
- var sliceEnd = null;
- if (_this.s.bytesToSkip != null) {
- sliceStart = _this.s.bytesToSkip;
- _this.s.bytesToSkip = 0;
- }
- if (expectedN === _this.s.expectedEnd && _this.s.bytesToTrim != null) {
- sliceEnd = _this.s.bytesToTrim;
- }
-
- if (_this.s.options.end && _this.s.options.end - _this.s.bytesToSkip < buf.length) {
- sliceEnd = _this.s.options.end - _this.s.bytesToSkip;
- }
- if (sliceStart != null || sliceEnd != null) {
- buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
- }
- _this.push(buf);
- });
- }
- function init(self) {
- var findOneOptions = {};
- if (self.s.readPreference) {
- findOneOptions.readPreference = self.s.readPreference;
- }
- if (self.s.options && self.s.options.sort) {
- findOneOptions.sort = self.s.options.sort;
- }
- if (self.s.options && self.s.options.skip) {
- findOneOptions.skip = self.s.options.skip;
- }
- self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
- if (error) {
- return __handleError(self, error);
- }
- if (!doc) {
- var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename;
- var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
- var err = new Error(errmsg);
- err.code = 'ENOENT';
- return __handleError(self, err);
- }
-
-
- if (doc.length <= 0) {
- self.push(null);
- return;
- }
- if (self.destroyed) {
-
-
-
- self.emit('close');
- return;
- }
- self.s.bytesToSkip = handleStartOption(self, doc, self.s.options);
- var filter = { files_id: doc._id };
-
-
-
- if (self.s.options && self.s.options.start != null) {
- var skip = Math.floor(self.s.options.start / doc.chunkSize);
- if (skip > 0) {
- filter['n'] = { $gte: skip };
- }
- }
- self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 });
- if (self.s.readPreference) {
- self.s.cursor.setReadPreference(self.s.readPreference);
- }
- self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
- self.s.file = doc;
- self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options);
- self.emit('file', doc);
- });
- }
- function waitForFile(_this, callback) {
- if (_this.s.file) {
- return callback();
- }
- if (!_this.s.init) {
- init(_this);
- _this.s.init = true;
- }
- _this.once('file', function() {
- callback();
- });
- }
- function handleStartOption(stream, doc, options) {
- if (options && options.start != null) {
- if (options.start > doc.length) {
- throw new Error(
- 'Stream start (' +
- options.start +
- ') must not be ' +
- 'more than the length of the file (' +
- doc.length +
- ')'
- );
- }
- if (options.start < 0) {
- throw new Error('Stream start (' + options.start + ') must not be ' + 'negative');
- }
- if (options.end != null && options.end < options.start) {
- throw new Error(
- 'Stream start (' +
- options.start +
- ') must not be ' +
- 'greater than stream end (' +
- options.end +
- ')'
- );
- }
- stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
- stream.s.expected = Math.floor(options.start / doc.chunkSize);
- return options.start - stream.s.bytesRead;
- }
- }
- function handleEndOption(stream, doc, cursor, options) {
- if (options && options.end != null) {
- if (options.end > doc.length) {
- throw new Error(
- 'Stream end (' +
- options.end +
- ') must not be ' +
- 'more than the length of the file (' +
- doc.length +
- ')'
- );
- }
- if (options.start < 0) {
- throw new Error('Stream end (' + options.end + ') must not be ' + 'negative');
- }
- var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
- cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
- stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
- return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
- }
- }
- function __handleError(_this, error) {
- _this.emit('error', error);
- }
|