| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 | 'use strict'const MiniPass = require('minipass')const EE = require('events').EventEmitterconst fs = require('fs')let writev = fs.writev/* istanbul ignore next */if (!writev) {  // This entire block can be removed if support for earlier than Node.js  // 12.9.0 is not needed.  const binding = process.binding('fs')  const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback  writev = (fd, iovec, pos, cb) => {    const done = (er, bw) => cb(er, bw, iovec)    const req = new FSReqWrap()    req.oncomplete = done    binding.writeBuffers(fd, iovec, pos, req)  }}const _autoClose = Symbol('_autoClose')const _close = Symbol('_close')const _ended = Symbol('_ended')const _fd = Symbol('_fd')const _finished = Symbol('_finished')const _flags = Symbol('_flags')const _flush = Symbol('_flush')const _handleChunk = Symbol('_handleChunk')const _makeBuf = Symbol('_makeBuf')const _mode = Symbol('_mode')const _needDrain = Symbol('_needDrain')const _onerror = Symbol('_onerror')const _onopen = Symbol('_onopen')const _onread = Symbol('_onread')const _onwrite = Symbol('_onwrite')const _open = Symbol('_open')const _path = Symbol('_path')const _pos = Symbol('_pos')const _queue = Symbol('_queue')const _read = Symbol('_read')const _readSize = Symbol('_readSize')const _reading = Symbol('_reading')const _remain = Symbol('_remain')const _size = Symbol('_size')const _write = Symbol('_write')const _writing = Symbol('_writing')const _defaultFlag = Symbol('_defaultFlag')const _errored = Symbol('_errored')class ReadStream extends MiniPass {  constructor (path, opt) {    opt = opt || {}    super(opt)    this.readable = true    this.writable = false    if (typeof path !== 'string')      throw new TypeError('path must be a string')    this[_errored] = false    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null    this[_path] = path    this[_readSize] = opt.readSize || 16*1024*1024    this[_reading] = false    this[_size] = typeof opt.size === 'number' ? opt.size : Infinity    this[_remain] = this[_size]    this[_autoClose] = typeof opt.autoClose === 'boolean' ?      opt.autoClose : true    if (typeof this[_fd] === 'number')      this[_read]()    else      this[_open]()  }  get fd () { return this[_fd] }  get path () { return this[_path] }  write () {    throw new TypeError('this is a readable stream')  }  end () {    throw new TypeError('this is a readable stream')  }  [_open] () {    fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd))  }  [_onopen] (er, fd) {    if (er)      this[_onerror](er)    else {      this[_fd] = fd      this.emit('open', fd)      this[_read]()    }  }  [_makeBuf] () {    return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain]))  }  [_read] () {    if (!this[_reading]) {      this[_reading] = true      const buf = this[_makeBuf]()      /* istanbul ignore if */      if (buf.length === 0)        return process.nextTick(() => this[_onread](null, 0, buf))      fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) =>        this[_onread](er, br, buf))    }  }  [_onread] (er, br, buf) {    this[_reading] = false    if (er)      this[_onerror](er)    else if (this[_handleChunk](br, buf))      this[_read]()  }  [_close] () {    if (this[_autoClose] && typeof this[_fd] === 'number') {      const fd = this[_fd]      this[_fd] = null      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))    }  }  [_onerror] (er) {    this[_reading] = true    this[_close]()    this.emit('error', er)  }  [_handleChunk] (br, buf) {    let ret = false    // no effect if infinite    this[_remain] -= br    if (br > 0)      ret = super.write(br < buf.length ? buf.slice(0, br) : buf)    if (br === 0 || this[_remain] <= 0) {      ret = false      this[_close]()      super.end()    }    return ret  }  emit (ev, data) {    switch (ev) {      case 'prefinish':      case 'finish':        break      case 'drain':        if (typeof this[_fd] === 'number')          this[_read]()        break      case 'error':        if (this[_errored])          return        this[_errored] = true        return super.emit(ev, data)      default:        return super.emit(ev, data)    }  }}class ReadStreamSync extends ReadStream {  [_open] () {    let threw = true    try {      this[_onopen](null, fs.openSync(this[_path], 'r'))      threw = false    } finally {      if (threw)        this[_close]()    }  }  [_read] () {    let threw = true    try {      if (!this[_reading]) {        this[_reading] = true        do {          const buf = this[_makeBuf]()          /* istanbul ignore next */          const br = buf.length === 0 ? 0            : fs.readSync(this[_fd], buf, 0, buf.length, null)          if (!this[_handleChunk](br, buf))            break        } while (true)        this[_reading] = false      }      threw = false    } finally {      if (threw)        this[_close]()    }  }  [_close] () {    if (this[_autoClose] && typeof this[_fd] === 'number') {      const fd = this[_fd]      this[_fd] = null      fs.closeSync(fd)      this.emit('close')    }  }}class WriteStream extends EE {  constructor (path, opt) {    opt = opt || {}    super(opt)    this.readable = false    this.writable = true    this[_errored] = false    this[_writing] = false    this[_ended] = false    this[_needDrain] = false    this[_queue] = []    this[_path] = path    this[_fd] = typeof opt.fd === 'number' ? opt.fd : null    this[_mode] = opt.mode === undefined ? 0o666 : opt.mode    this[_pos] = typeof opt.start === 'number' ? opt.start : null    this[_autoClose] = typeof opt.autoClose === 'boolean' ?      opt.autoClose : true    // truncating makes no sense when writing into the middle    const defaultFlag = this[_pos] !== null ? 'r+' : 'w'    this[_defaultFlag] = opt.flags === undefined    this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags    if (this[_fd] === null)      this[_open]()  }  emit (ev, data) {    if (ev === 'error') {      if (this[_errored])        return      this[_errored] = true    }    return super.emit(ev, data)  }  get fd () { return this[_fd] }  get path () { return this[_path] }  [_onerror] (er) {    this[_close]()    this[_writing] = true    this.emit('error', er)  }  [_open] () {    fs.open(this[_path], this[_flags], this[_mode],      (er, fd) => this[_onopen](er, fd))  }  [_onopen] (er, fd) {    if (this[_defaultFlag] &&        this[_flags] === 'r+' &&        er && er.code === 'ENOENT') {      this[_flags] = 'w'      this[_open]()    } else if (er)      this[_onerror](er)    else {      this[_fd] = fd      this.emit('open', fd)      this[_flush]()    }  }  end (buf, enc) {    if (buf)      this.write(buf, enc)    this[_ended] = true    // synthetic after-write logic, where drain/finish live    if (!this[_writing] && !this[_queue].length &&        typeof this[_fd] === 'number')      this[_onwrite](null, 0)    return this  }  write (buf, enc) {    if (typeof buf === 'string')      buf = Buffer.from(buf, enc)    if (this[_ended]) {      this.emit('error', new Error('write() after end()'))      return false    }    if (this[_fd] === null || this[_writing] || this[_queue].length) {      this[_queue].push(buf)      this[_needDrain] = true      return false    }    this[_writing] = true    this[_write](buf)    return true  }  [_write] (buf) {    fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) =>      this[_onwrite](er, bw))  }  [_onwrite] (er, bw) {    if (er)      this[_onerror](er)    else {      if (this[_pos] !== null)        this[_pos] += bw      if (this[_queue].length)        this[_flush]()      else {        this[_writing] = false        if (this[_ended] && !this[_finished]) {          this[_finished] = true          this[_close]()          this.emit('finish')        } else if (this[_needDrain]) {          this[_needDrain] = false          this.emit('drain')        }      }    }  }  [_flush] () {    if (this[_queue].length === 0) {      if (this[_ended])        this[_onwrite](null, 0)    } else if (this[_queue].length === 1)      this[_write](this[_queue].pop())    else {      const iovec = this[_queue]      this[_queue] = []      writev(this[_fd], iovec, this[_pos],        (er, bw) => this[_onwrite](er, bw))    }  }  [_close] () {    if (this[_autoClose] && typeof this[_fd] === 'number') {      const fd = this[_fd]      this[_fd] = null      fs.close(fd, er => er ? this.emit('error', er) : this.emit('close'))    }  }}class WriteStreamSync extends WriteStream {  [_open] () {    let fd    // only wrap in a try{} block if we know we'll retry, to avoid    // the rethrow obscuring the error's source frame in most cases.    if (this[_defaultFlag] && this[_flags] === 'r+') {      try {        fd = fs.openSync(this[_path], this[_flags], this[_mode])      } catch (er) {        if (er.code === 'ENOENT') {          this[_flags] = 'w'          return this[_open]()        } else          throw er      }    } else      fd = fs.openSync(this[_path], this[_flags], this[_mode])    this[_onopen](null, fd)  }  [_close] () {    if (this[_autoClose] && typeof this[_fd] === 'number') {      const fd = this[_fd]      this[_fd] = null      fs.closeSync(fd)      this.emit('close')    }  }  [_write] (buf) {    // throw the original, but try to close if it fails    let threw = true    try {      this[_onwrite](null,        fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos]))      threw = false    } finally {      if (threw)        try { this[_close]() } catch (_) {}    }  }}exports.ReadStream = ReadStreamexports.ReadStreamSync = ReadStreamSyncexports.WriteStream = WriteStreamexports.WriteStreamSync = WriteStreamSync
 |