123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- var fs = require('fs');
- var util = require('util');
- var stream = require('stream');
- var Readable = stream.Readable;
- var Writable = stream.Writable;
- var PassThrough = stream.PassThrough;
- var Pend = require('pend');
- var EventEmitter = require('events').EventEmitter;
- exports.createFromBuffer = createFromBuffer;
- exports.createFromFd = createFromFd;
- exports.BufferSlicer = BufferSlicer;
- exports.FdSlicer = FdSlicer;
- util.inherits(FdSlicer, EventEmitter);
- function FdSlicer(fd, options) {
- options = options || {};
- EventEmitter.call(this);
- this.fd = fd;
- this.pend = new Pend();
- this.pend.max = 1;
- this.refCount = 0;
- this.autoClose = !!options.autoClose;
- }
- FdSlicer.prototype.read = function(buffer, offset, length, position, callback) {
- var self = this;
- self.pend.go(function(cb) {
- fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) {
- cb();
- callback(err, bytesRead, buffer);
- });
- });
- };
- FdSlicer.prototype.write = function(buffer, offset, length, position, callback) {
- var self = this;
- self.pend.go(function(cb) {
- fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) {
- cb();
- callback(err, written, buffer);
- });
- });
- };
- FdSlicer.prototype.createReadStream = function(options) {
- return new ReadStream(this, options);
- };
- FdSlicer.prototype.createWriteStream = function(options) {
- return new WriteStream(this, options);
- };
- FdSlicer.prototype.ref = function() {
- this.refCount += 1;
- };
- FdSlicer.prototype.unref = function() {
- var self = this;
- self.refCount -= 1;
- if (self.refCount > 0) return;
- if (self.refCount < 0) throw new Error("invalid unref");
- if (self.autoClose) {
- fs.close(self.fd, onCloseDone);
- }
- function onCloseDone(err) {
- if (err) {
- self.emit('error', err);
- } else {
- self.emit('close');
- }
- }
- };
- util.inherits(ReadStream, Readable);
- function ReadStream(context, options) {
- options = options || {};
- Readable.call(this, options);
- this.context = context;
- this.context.ref();
- this.start = options.start || 0;
- this.endOffset = options.end;
- this.pos = this.start;
- this.destroyed = false;
- }
- ReadStream.prototype._read = function(n) {
- var self = this;
- if (self.destroyed) return;
- var toRead = Math.min(self._readableState.highWaterMark, n);
- if (self.endOffset != null) {
- toRead = Math.min(toRead, self.endOffset - self.pos);
- }
- if (toRead <= 0) {
- self.destroyed = true;
- self.push(null);
- self.context.unref();
- return;
- }
- self.context.pend.go(function(cb) {
- if (self.destroyed) return cb();
- var buffer = new Buffer(toRead);
- fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) {
- if (err) {
- self.destroy(err);
- } else if (bytesRead === 0) {
- self.destroyed = true;
- self.push(null);
- self.context.unref();
- } else {
- self.pos += bytesRead;
- self.push(buffer.slice(0, bytesRead));
- }
- cb();
- });
- });
- };
- ReadStream.prototype.destroy = function(err) {
- if (this.destroyed) return;
- err = err || new Error("stream destroyed");
- this.destroyed = true;
- this.emit('error', err);
- this.context.unref();
- };
- util.inherits(WriteStream, Writable);
- function WriteStream(context, options) {
- options = options || {};
- Writable.call(this, options);
- this.context = context;
- this.context.ref();
- this.start = options.start || 0;
- this.endOffset = (options.end == null) ? Infinity : +options.end;
- this.bytesWritten = 0;
- this.pos = this.start;
- this.destroyed = false;
- this.on('finish', this.destroy.bind(this));
- }
- WriteStream.prototype._write = function(buffer, encoding, callback) {
- var self = this;
- if (self.destroyed) return;
- if (self.pos + buffer.length > self.endOffset) {
- var err = new Error("maximum file length exceeded");
- err.code = 'ETOOBIG';
- self.destroy();
- callback(err);
- return;
- }
- self.context.pend.go(function(cb) {
- if (self.destroyed) return cb();
- fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) {
- if (err) {
- self.destroy();
- cb();
- callback(err);
- } else {
- self.bytesWritten += bytes;
- self.pos += bytes;
- self.emit('progress');
- cb();
- callback();
- }
- });
- });
- };
- WriteStream.prototype.destroy = function() {
- if (this.destroyed) return;
- this.destroyed = true;
- this.context.unref();
- };
- util.inherits(BufferSlicer, EventEmitter);
- function BufferSlicer(buffer, options) {
- EventEmitter.call(this);
- options = options || {};
- this.refCount = 0;
- this.buffer = buffer;
- this.maxChunkSize = options.maxChunkSize || Number.MAX_SAFE_INTEGER;
- }
- BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) {
- var end = position + length;
- var delta = end - this.buffer.length;
- var written = (delta > 0) ? delta : length;
- this.buffer.copy(buffer, offset, position, end);
- setImmediate(function() {
- callback(null, written);
- });
- };
- BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) {
- buffer.copy(this.buffer, position, offset, offset + length);
- setImmediate(function() {
- callback(null, length, buffer);
- });
- };
- BufferSlicer.prototype.createReadStream = function(options) {
- options = options || {};
- var readStream = new PassThrough(options);
- readStream.destroyed = false;
- readStream.start = options.start || 0;
- readStream.endOffset = options.end;
- // by the time this function returns, we'll be done.
- readStream.pos = readStream.endOffset || this.buffer.length;
- // respect the maxChunkSize option to slice up the chunk into smaller pieces.
- var entireSlice = this.buffer.slice(readStream.start, readStream.pos);
- var offset = 0;
- while (true) {
- var nextOffset = offset + this.maxChunkSize;
- if (nextOffset >= entireSlice.length) {
- // last chunk
- if (offset < entireSlice.length) {
- readStream.write(entireSlice.slice(offset, entireSlice.length));
- }
- break;
- }
- readStream.write(entireSlice.slice(offset, nextOffset));
- offset = nextOffset;
- }
- readStream.end();
- readStream.destroy = function() {
- readStream.destroyed = true;
- };
- return readStream;
- };
- BufferSlicer.prototype.createWriteStream = function(options) {
- var bufferSlicer = this;
- options = options || {};
- var writeStream = new Writable(options);
- writeStream.start = options.start || 0;
- writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end;
- writeStream.bytesWritten = 0;
- writeStream.pos = writeStream.start;
- writeStream.destroyed = false;
- writeStream._write = function(buffer, encoding, callback) {
- if (writeStream.destroyed) return;
- var end = writeStream.pos + buffer.length;
- if (end > writeStream.endOffset) {
- var err = new Error("maximum file length exceeded");
- err.code = 'ETOOBIG';
- writeStream.destroyed = true;
- callback(err);
- return;
- }
- buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length);
- writeStream.bytesWritten += buffer.length;
- writeStream.pos = end;
- writeStream.emit('progress');
- callback();
- };
- writeStream.destroy = function() {
- writeStream.destroyed = true;
- };
- return writeStream;
- };
- BufferSlicer.prototype.ref = function() {
- this.refCount += 1;
- };
- BufferSlicer.prototype.unref = function() {
- this.refCount -= 1;
- if (this.refCount < 0) {
- throw new Error("invalid unref");
- }
- };
- function createFromBuffer(buffer, options) {
- return new BufferSlicer(buffer, options);
- }
- function createFromFd(fd, options) {
- return new FdSlicer(fd, options);
- }
|