| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 | 'use strict'const BB = require('bluebird')const cacheFile = require('npm-cache-filename')const chownr = BB.promisify(require('chownr'))const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))const figgyPudding = require('figgy-pudding')const fs = require('graceful-fs')const JSONStream = require('JSONStream')const log = require('npmlog')const mkdir = BB.promisify(require('gentle-fs').mkdir)const ms = require('mississippi')const npmFetch = require('libnpm/fetch')const path = require('path')const sortedUnionStream = require('sorted-union-stream')const url = require('url')const writeStreamAtomic = require('fs-write-stream-atomic')const statAsync = BB.promisify(fs.stat)const APMOpts = figgyPudding({  cache: {},  registry: {}})// Returns a sorted stream of all package metadata. Internally, takes care of// maintaining its metadata cache and making partial or full remote requests,// according to staleness, validity, etc.//// The local cache must hold certain invariants:// 1. It must be a proper JSON object// 2. It must have its keys lexically sorted// 3. The first entry must be `_updated` with a millisecond timestamp as a val.// 4. It must include all entries that exist in the metadata endpoint as of//    the value in `_updated`module.exports = allPackageMetadatafunction allPackageMetadata (opts) {  const staleness = opts.staleness  const stream = ms.through.obj()  opts = APMOpts(opts)  const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))  const cachePath = path.join(cacheBase, '.cache.json')  createEntryStream(    cachePath, staleness, opts  ).then(({entryStream, latest, newEntries}) => {    log.silly('all-package-metadata', 'entry stream created')    if (entryStream && newEntries) {      return createCacheWriteStream(cachePath, latest, opts).then(writer => {        log.silly('all-package-metadata', 'output stream created')        ms.pipeline.obj(entryStream, writer, stream)      })    } else if (entryStream) {      ms.pipeline.obj(entryStream, stream)    } else {      stream.emit('error', new Error('No search sources available'))    }  }).catch(err => stream.emit('error', err))  return stream}// Creates a stream of the latest available package metadata.// Metadata will come from a combination of the local cache and remote data.module.exports._createEntryStream = createEntryStreamfunction createEntryStream (cachePath, staleness, opts) {  return createCacheEntryStream(    cachePath, opts  ).catch(err => {    log.warn('', 'Failed to read search cache. Rebuilding')    log.silly('all-package-metadata', 'cache read error: ', err)    return {}  }).then(({    updateStream: cacheStream,    updatedLatest: cacheLatest  }) => {    cacheLatest = cacheLatest || 0    return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {      log.warn('', 'Search data request failed, search might be stale')      log.silly('all-package-metadata', 'update request error: ', err)      return {}    }).then(({updateStream, updatedLatest}) => {      updatedLatest = updatedLatest || 0      const latest = updatedLatest || cacheLatest      if (!cacheStream && !updateStream) {        throw new Error('No search sources available')      }      if (cacheStream && updateStream) {        // Deduped, unioned, sorted stream from the combination of both.        return {          entryStream: createMergedStream(cacheStream, updateStream),          latest,          newEntries: !!updatedLatest        }      } else {        // Either one works if one or the other failed        return {          entryStream: cacheStream || updateStream,          latest,          newEntries: !!updatedLatest        }      }    })  })}// Merges `a` and `b` into one stream, dropping duplicates in favor of entries// in `b`. Both input streams should already be individually sorted, and the// returned output stream will have semantics resembling the merge step of a// plain old merge sort.module.exports._createMergedStream = createMergedStreamfunction createMergedStream (a, b) {  linkStreams(a, b)  return sortedUnionStream(b, a, ({name}) => name)}// Reads the local index and returns a stream that spits out package data.module.exports._createCacheEntryStream = createCacheEntryStreamfunction createCacheEntryStream (cacheFile, opts) {  log.verbose('all-package-metadata', 'creating entry stream from local cache')  log.verbose('all-package-metadata', cacheFile)  return statAsync(cacheFile).then(stat => {    // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`    const entryStream = ms.pipeline.obj(      fs.createReadStream(cacheFile),      JSONStream.parse('*'),      // I believe this passthrough is necessary cause `jsonstream` returns      // weird custom streams that behave funny sometimes.      ms.through.obj()    )    return extractUpdated(entryStream, 'cached-entry-stream', opts)  })}// Stream of entry updates from the server. If `latest` is `0`, streams the// entire metadata object from the registry.module.exports._createEntryUpdateStream = createEntryUpdateStreamfunction createEntryUpdateStream (staleness, latest, opts) {  log.verbose('all-package-metadata', 'creating remote entry stream')  let partialUpdate = false  let uri = '/-/all'  if (latest && (Date.now() - latest < (staleness * 1000))) {    // Skip the request altogether if our `latest` isn't stale.    log.verbose('all-package-metadata', 'Local data up to date, skipping update')    return BB.resolve({})  } else if (latest === 0) {    log.warn('', 'Building the local index for the first time, please be patient')    log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')  } else {    log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')    uri += '/since?stale=update_after&startkey=' + latest    partialUpdate = true  }  return npmFetch(uri, opts).then(res => {    log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)    let entryStream = ms.pipeline.obj(      res.body,      JSONStream.parse('*', (pkg, key) => {        if (key[0] === '_updated' || key[0][0] !== '_') {          return pkg        }      })    )    if (partialUpdate) {      // The `/all/since` endpoint doesn't return `_updated`, so we      // just use the request's own timestamp.      return {        updateStream: entryStream,        updatedLatest: Date.parse(res.headers.get('date'))      }    } else {      return extractUpdated(entryStream, 'entry-update-stream', opts)    }  })}// Both the (full) remote requests and the local index have `_updated` as their// first returned entries. This is the "latest" unix timestamp for the metadata// in question. This code does a bit of juggling with the data streams// so that we can pretend that field doesn't exist, but still extract `latest`function extractUpdated (entryStream, label, opts) {  log.silly('all-package-metadata', 'extracting latest')  return new BB((resolve, reject) => {    function nope (msg) {      return function () {        log.warn('all-package-metadata', label, msg)        entryStream.removeAllListeners()        entryStream.destroy()        reject(new Error(msg))      }    }    const onErr = nope('Failed to read stream')    const onEnd = nope('Empty or invalid stream')    entryStream.on('error', onErr)    entryStream.on('end', onEnd)    entryStream.once('data', latest => {      log.silly('all-package-metadata', 'got first stream entry for', label, latest)      entryStream.removeListener('error', onErr)      entryStream.removeListener('end', onEnd)      if (typeof latest === 'number') {        // The extra pipeline is to return a stream that will implicitly unpause        // after having an `.on('data')` listener attached, since using this        // `data` event broke its initial state.        resolve({          updateStream: entryStream.pipe(ms.through.obj()),          updatedLatest: latest        })      } else {        reject(new Error('expected first entry to be _updated'))      }    })  })}// Creates a stream that writes input metadata to the current cache.// Cache updates are atomic, and the stream closes when *everything* is done.// The stream is also passthrough, so entries going through it will also// be output from it.module.exports._createCacheWriteStream = createCacheWriteStreamfunction createCacheWriteStream (cacheFile, latest, opts) {  return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {    log.silly('all-package-metadata', 'creating output stream')    const outStream = _createCacheOutStream()    const cacheFileStream = writeStreamAtomic(cacheFile)    const inputStream = _createCacheInStream(      cacheFileStream, outStream, latest    )    // Glue together the various streams so they fail together.    // `cacheFileStream` errors are already handled by the `inputStream`    // pipeline    let errEmitted = false    linkStreams(inputStream, outStream, () => { errEmitted = true })    cacheFileStream.on('close', () => {      if (!errEmitted) {        if (typeof uid === 'number' &&            typeof gid === 'number' &&            process.getuid &&            process.getgid &&            (process.getuid() !== uid || process.getgid() !== gid)) {          chownr.sync(cacheFile, uid, gid)        }        outStream.end()      }    })    return ms.duplex.obj(inputStream, outStream)  })}// return the {uid,gid} that the cache should havefunction _ensureCacheDirExists (cacheFile, opts) {  var cacheBase = path.dirname(cacheFile)  log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)  return correctMkdir(opts.cache).then(st => {    return mkdir(cacheBase).then(made => {      return chownr(made || cacheBase, st.uid, st.gid)    }).then(() => ({ uid: st.uid, gid: st.gid }))  })}function _createCacheOutStream () {  // NOTE: this looks goofy, but it's necessary in order to get  //       JSONStream to play nice with the rest of everything.  return ms.pipeline.obj(    ms.through(),    JSONStream.parse('*', (obj, key) => {      // This stream happens to get _updated passed through it, for      // implementation reasons. We make sure to filter it out cause      // the fact that it comes t      if (typeof obj === 'object') {        return obj      }    }),    ms.through.obj()  )}function _createCacheInStream (writer, outStream, latest) {  let updatedWritten = false  const inStream = ms.pipeline.obj(    ms.through.obj((pkg, enc, cb) => {      if (!updatedWritten && typeof pkg === 'number') {        // This is the `_updated` value getting sent through.        updatedWritten = true        return cb(null, ['_updated', pkg])      } else if (typeof pkg !== 'object') {        this.emit('error', new Error('invalid value written to input stream'))      } else {        // The [key, val] format is expected by `jsonstream` for object writing        cb(null, [pkg.name, pkg])      }    }),    JSONStream.stringifyObject('{', ',', '}'),    ms.through((chunk, enc, cb) => {      // This tees off the buffer data to `outStream`, and then continues      // the pipeline as usual      outStream.write(chunk, enc, () => cb(null, chunk))    }),    // And finally, we write to the cache file.    writer  )  inStream.write(latest)  return inStream}// Links errors between `a` and `b`, preventing cycles, and calls `cb` if// an error happens, once per error.function linkStreams (a, b, cb) {  var lastError = null  a.on('error', function (err) {    if (err !== lastError) {      lastError = err      b.emit('error', err)      cb && cb(err)    }  })  b.on('error', function (err) {    if (err !== lastError) {      lastError = err      a.emit('error', err)      cb && cb(err)    }  })}
 |