123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- 'use strict'
- const BB = require('bluebird')
- const cacheFile = require('npm-cache-filename')
- const chownr = BB.promisify(require('chownr'))
- const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
- const figgyPudding = require('figgy-pudding')
- const fs = require('graceful-fs')
- const JSONStream = require('JSONStream')
- const log = require('npmlog')
- const mkdir = BB.promisify(require('gentle-fs').mkdir)
- const ms = require('mississippi')
- const npmFetch = require('libnpm/fetch')
- const path = require('path')
- const sortedUnionStream = require('sorted-union-stream')
- const url = require('url')
- const writeStreamAtomic = require('fs-write-stream-atomic')
- const statAsync = BB.promisify(fs.stat)
- const APMOpts = figgyPudding({
- cache: {},
- registry: {}
- })
- // Returns a sorted stream of all package metadata. Internally, takes care of
- // maintaining its metadata cache and making partial or full remote requests,
- // according to staleness, validity, etc.
- //
- // The local cache must hold certain invariants:
- // 1. It must be a proper JSON object
- // 2. It must have its keys lexically sorted
- // 3. The first entry must be `_updated` with a millisecond timestamp as a val.
- // 4. It must include all entries that exist in the metadata endpoint as of
- // the value in `_updated`
- module.exports = allPackageMetadata
- function allPackageMetadata (opts) {
- const staleness = opts.staleness
- const stream = ms.through.obj()
- opts = APMOpts(opts)
- const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
- const cachePath = path.join(cacheBase, '.cache.json')
- createEntryStream(
- cachePath, staleness, opts
- ).then(({entryStream, latest, newEntries}) => {
- log.silly('all-package-metadata', 'entry stream created')
- if (entryStream && newEntries) {
- return createCacheWriteStream(cachePath, latest, opts).then(writer => {
- log.silly('all-package-metadata', 'output stream created')
- ms.pipeline.obj(entryStream, writer, stream)
- })
- } else if (entryStream) {
- ms.pipeline.obj(entryStream, stream)
- } else {
- stream.emit('error', new Error('No search sources available'))
- }
- }).catch(err => stream.emit('error', err))
- return stream
- }
- // Creates a stream of the latest available package metadata.
- // Metadata will come from a combination of the local cache and remote data.
- module.exports._createEntryStream = createEntryStream
- function createEntryStream (cachePath, staleness, opts) {
- return createCacheEntryStream(
- cachePath, opts
- ).catch(err => {
- log.warn('', 'Failed to read search cache. Rebuilding')
- log.silly('all-package-metadata', 'cache read error: ', err)
- return {}
- }).then(({
- updateStream: cacheStream,
- updatedLatest: cacheLatest
- }) => {
- cacheLatest = cacheLatest || 0
- return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
- log.warn('', 'Search data request failed, search might be stale')
- log.silly('all-package-metadata', 'update request error: ', err)
- return {}
- }).then(({updateStream, updatedLatest}) => {
- updatedLatest = updatedLatest || 0
- const latest = updatedLatest || cacheLatest
- if (!cacheStream && !updateStream) {
- throw new Error('No search sources available')
- }
- if (cacheStream && updateStream) {
- // Deduped, unioned, sorted stream from the combination of both.
- return {
- entryStream: createMergedStream(cacheStream, updateStream),
- latest,
- newEntries: !!updatedLatest
- }
- } else {
- // Either one works if one or the other failed
- return {
- entryStream: cacheStream || updateStream,
- latest,
- newEntries: !!updatedLatest
- }
- }
- })
- })
- }
- // Merges `a` and `b` into one stream, dropping duplicates in favor of entries
- // in `b`. Both input streams should already be individually sorted, and the
- // returned output stream will have semantics resembling the merge step of a
- // plain old merge sort.
- module.exports._createMergedStream = createMergedStream
- function createMergedStream (a, b) {
- linkStreams(a, b)
- return sortedUnionStream(b, a, ({name}) => name)
- }
- // Reads the local index and returns a stream that spits out package data.
- module.exports._createCacheEntryStream = createCacheEntryStream
- function createCacheEntryStream (cacheFile, opts) {
- log.verbose('all-package-metadata', 'creating entry stream from local cache')
- log.verbose('all-package-metadata', cacheFile)
- return statAsync(cacheFile).then(stat => {
- // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
- const entryStream = ms.pipeline.obj(
- fs.createReadStream(cacheFile),
- JSONStream.parse('*'),
- // I believe this passthrough is necessary cause `jsonstream` returns
- // weird custom streams that behave funny sometimes.
- ms.through.obj()
- )
- return extractUpdated(entryStream, 'cached-entry-stream', opts)
- })
- }
- // Stream of entry updates from the server. If `latest` is `0`, streams the
- // entire metadata object from the registry.
- module.exports._createEntryUpdateStream = createEntryUpdateStream
- function createEntryUpdateStream (staleness, latest, opts) {
- log.verbose('all-package-metadata', 'creating remote entry stream')
- let partialUpdate = false
- let uri = '/-/all'
- if (latest && (Date.now() - latest < (staleness * 1000))) {
- // Skip the request altogether if our `latest` isn't stale.
- log.verbose('all-package-metadata', 'Local data up to date, skipping update')
- return BB.resolve({})
- } else if (latest === 0) {
- log.warn('', 'Building the local index for the first time, please be patient')
- log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
- } else {
- log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
- uri += '/since?stale=update_after&startkey=' + latest
- partialUpdate = true
- }
- return npmFetch(uri, opts).then(res => {
- log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
- let entryStream = ms.pipeline.obj(
- res.body,
- JSONStream.parse('*', (pkg, key) => {
- if (key[0] === '_updated' || key[0][0] !== '_') {
- return pkg
- }
- })
- )
- if (partialUpdate) {
- // The `/all/since` endpoint doesn't return `_updated`, so we
- // just use the request's own timestamp.
- return {
- updateStream: entryStream,
- updatedLatest: Date.parse(res.headers.get('date'))
- }
- } else {
- return extractUpdated(entryStream, 'entry-update-stream', opts)
- }
- })
- }
- // Both the (full) remote requests and the local index have `_updated` as their
- // first returned entries. This is the "latest" unix timestamp for the metadata
- // in question. This code does a bit of juggling with the data streams
- // so that we can pretend that field doesn't exist, but still extract `latest`
- function extractUpdated (entryStream, label, opts) {
- log.silly('all-package-metadata', 'extracting latest')
- return new BB((resolve, reject) => {
- function nope (msg) {
- return function () {
- log.warn('all-package-metadata', label, msg)
- entryStream.removeAllListeners()
- entryStream.destroy()
- reject(new Error(msg))
- }
- }
- const onErr = nope('Failed to read stream')
- const onEnd = nope('Empty or invalid stream')
- entryStream.on('error', onErr)
- entryStream.on('end', onEnd)
- entryStream.once('data', latest => {
- log.silly('all-package-metadata', 'got first stream entry for', label, latest)
- entryStream.removeListener('error', onErr)
- entryStream.removeListener('end', onEnd)
- if (typeof latest === 'number') {
- // The extra pipeline is to return a stream that will implicitly unpause
- // after having an `.on('data')` listener attached, since using this
- // `data` event broke its initial state.
- resolve({
- updateStream: entryStream.pipe(ms.through.obj()),
- updatedLatest: latest
- })
- } else {
- reject(new Error('expected first entry to be _updated'))
- }
- })
- })
- }
- // Creates a stream that writes input metadata to the current cache.
- // Cache updates are atomic, and the stream closes when *everything* is done.
- // The stream is also passthrough, so entries going through it will also
- // be output from it.
- module.exports._createCacheWriteStream = createCacheWriteStream
- function createCacheWriteStream (cacheFile, latest, opts) {
- return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {
- log.silly('all-package-metadata', 'creating output stream')
- const outStream = _createCacheOutStream()
- const cacheFileStream = writeStreamAtomic(cacheFile)
- const inputStream = _createCacheInStream(
- cacheFileStream, outStream, latest
- )
- // Glue together the various streams so they fail together.
- // `cacheFileStream` errors are already handled by the `inputStream`
- // pipeline
- let errEmitted = false
- linkStreams(inputStream, outStream, () => { errEmitted = true })
- cacheFileStream.on('close', () => {
- if (!errEmitted) {
- if (typeof uid === 'number' &&
- typeof gid === 'number' &&
- process.getuid &&
- process.getgid &&
- (process.getuid() !== uid || process.getgid() !== gid)) {
- chownr.sync(cacheFile, uid, gid)
- }
- outStream.end()
- }
- })
- return ms.duplex.obj(inputStream, outStream)
- })
- }
- // return the {uid,gid} that the cache should have
- function _ensureCacheDirExists (cacheFile, opts) {
- var cacheBase = path.dirname(cacheFile)
- log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
- return correctMkdir(opts.cache).then(st => {
- return mkdir(cacheBase).then(made => {
- return chownr(made || cacheBase, st.uid, st.gid)
- }).then(() => ({ uid: st.uid, gid: st.gid }))
- })
- }
- function _createCacheOutStream () {
- // NOTE: this looks goofy, but it's necessary in order to get
- // JSONStream to play nice with the rest of everything.
- return ms.pipeline.obj(
- ms.through(),
- JSONStream.parse('*', (obj, key) => {
- // This stream happens to get _updated passed through it, for
- // implementation reasons. We make sure to filter it out cause
- // the fact that it comes t
- if (typeof obj === 'object') {
- return obj
- }
- }),
- ms.through.obj()
- )
- }
- function _createCacheInStream (writer, outStream, latest) {
- let updatedWritten = false
- const inStream = ms.pipeline.obj(
- ms.through.obj((pkg, enc, cb) => {
- if (!updatedWritten && typeof pkg === 'number') {
- // This is the `_updated` value getting sent through.
- updatedWritten = true
- return cb(null, ['_updated', pkg])
- } else if (typeof pkg !== 'object') {
- this.emit('error', new Error('invalid value written to input stream'))
- } else {
- // The [key, val] format is expected by `jsonstream` for object writing
- cb(null, [pkg.name, pkg])
- }
- }),
- JSONStream.stringifyObject('{', ',', '}'),
- ms.through((chunk, enc, cb) => {
- // This tees off the buffer data to `outStream`, and then continues
- // the pipeline as usual
- outStream.write(chunk, enc, () => cb(null, chunk))
- }),
- // And finally, we write to the cache file.
- writer
- )
- inStream.write(latest)
- return inStream
- }
- // Links errors between `a` and `b`, preventing cycles, and calls `cb` if
- // an error happens, once per error.
- function linkStreams (a, b, cb) {
- var lastError = null
- a.on('error', function (err) {
- if (err !== lastError) {
- lastError = err
- b.emit('error', err)
- cb && cb(err)
- }
- })
- b.on('error', function (err) {
- if (err !== lastError) {
- lastError = err
- a.emit('error', err)
- cb && cb(err)
- }
- })
- }
|