all-package-metadata.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. 'use strict'
  2. const BB = require('bluebird')
  3. const cacheFile = require('npm-cache-filename')
  4. const chownr = BB.promisify(require('chownr'))
  5. const correctMkdir = BB.promisify(require('../utils/correct-mkdir.js'))
  6. const figgyPudding = require('figgy-pudding')
  7. const fs = require('graceful-fs')
  8. const JSONStream = require('JSONStream')
  9. const log = require('npmlog')
  10. const mkdir = BB.promisify(require('gentle-fs').mkdir)
  11. const ms = require('mississippi')
  12. const npmFetch = require('libnpm/fetch')
  13. const path = require('path')
  14. const sortedUnionStream = require('sorted-union-stream')
  15. const url = require('url')
  16. const writeStreamAtomic = require('fs-write-stream-atomic')
  17. const statAsync = BB.promisify(fs.stat)
  18. const APMOpts = figgyPudding({
  19. cache: {},
  20. registry: {}
  21. })
  22. // Returns a sorted stream of all package metadata. Internally, takes care of
  23. // maintaining its metadata cache and making partial or full remote requests,
  24. // according to staleness, validity, etc.
  25. //
  26. // The local cache must hold certain invariants:
  27. // 1. It must be a proper JSON object
  28. // 2. It must have its keys lexically sorted
  29. // 3. The first entry must be `_updated` with a millisecond timestamp as a val.
  30. // 4. It must include all entries that exist in the metadata endpoint as of
  31. // the value in `_updated`
  32. module.exports = allPackageMetadata
  33. function allPackageMetadata (opts) {
  34. const staleness = opts.staleness
  35. const stream = ms.through.obj()
  36. opts = APMOpts(opts)
  37. const cacheBase = cacheFile(path.resolve(path.dirname(opts.cache)))(url.resolve(opts.registry, '/-/all'))
  38. const cachePath = path.join(cacheBase, '.cache.json')
  39. createEntryStream(
  40. cachePath, staleness, opts
  41. ).then(({entryStream, latest, newEntries}) => {
  42. log.silly('all-package-metadata', 'entry stream created')
  43. if (entryStream && newEntries) {
  44. return createCacheWriteStream(cachePath, latest, opts).then(writer => {
  45. log.silly('all-package-metadata', 'output stream created')
  46. ms.pipeline.obj(entryStream, writer, stream)
  47. })
  48. } else if (entryStream) {
  49. ms.pipeline.obj(entryStream, stream)
  50. } else {
  51. stream.emit('error', new Error('No search sources available'))
  52. }
  53. }).catch(err => stream.emit('error', err))
  54. return stream
  55. }
  56. // Creates a stream of the latest available package metadata.
  57. // Metadata will come from a combination of the local cache and remote data.
  58. module.exports._createEntryStream = createEntryStream
  59. function createEntryStream (cachePath, staleness, opts) {
  60. return createCacheEntryStream(
  61. cachePath, opts
  62. ).catch(err => {
  63. log.warn('', 'Failed to read search cache. Rebuilding')
  64. log.silly('all-package-metadata', 'cache read error: ', err)
  65. return {}
  66. }).then(({
  67. updateStream: cacheStream,
  68. updatedLatest: cacheLatest
  69. }) => {
  70. cacheLatest = cacheLatest || 0
  71. return createEntryUpdateStream(staleness, cacheLatest, opts).catch(err => {
  72. log.warn('', 'Search data request failed, search might be stale')
  73. log.silly('all-package-metadata', 'update request error: ', err)
  74. return {}
  75. }).then(({updateStream, updatedLatest}) => {
  76. updatedLatest = updatedLatest || 0
  77. const latest = updatedLatest || cacheLatest
  78. if (!cacheStream && !updateStream) {
  79. throw new Error('No search sources available')
  80. }
  81. if (cacheStream && updateStream) {
  82. // Deduped, unioned, sorted stream from the combination of both.
  83. return {
  84. entryStream: createMergedStream(cacheStream, updateStream),
  85. latest,
  86. newEntries: !!updatedLatest
  87. }
  88. } else {
  89. // Either one works if one or the other failed
  90. return {
  91. entryStream: cacheStream || updateStream,
  92. latest,
  93. newEntries: !!updatedLatest
  94. }
  95. }
  96. })
  97. })
  98. }
  99. // Merges `a` and `b` into one stream, dropping duplicates in favor of entries
  100. // in `b`. Both input streams should already be individually sorted, and the
  101. // returned output stream will have semantics resembling the merge step of a
  102. // plain old merge sort.
  103. module.exports._createMergedStream = createMergedStream
  104. function createMergedStream (a, b) {
  105. linkStreams(a, b)
  106. return sortedUnionStream(b, a, ({name}) => name)
  107. }
  108. // Reads the local index and returns a stream that spits out package data.
  109. module.exports._createCacheEntryStream = createCacheEntryStream
  110. function createCacheEntryStream (cacheFile, opts) {
  111. log.verbose('all-package-metadata', 'creating entry stream from local cache')
  112. log.verbose('all-package-metadata', cacheFile)
  113. return statAsync(cacheFile).then(stat => {
  114. // TODO - This isn't very helpful if `cacheFile` is empty or just `{}`
  115. const entryStream = ms.pipeline.obj(
  116. fs.createReadStream(cacheFile),
  117. JSONStream.parse('*'),
  118. // I believe this passthrough is necessary cause `jsonstream` returns
  119. // weird custom streams that behave funny sometimes.
  120. ms.through.obj()
  121. )
  122. return extractUpdated(entryStream, 'cached-entry-stream', opts)
  123. })
  124. }
  125. // Stream of entry updates from the server. If `latest` is `0`, streams the
  126. // entire metadata object from the registry.
  127. module.exports._createEntryUpdateStream = createEntryUpdateStream
  128. function createEntryUpdateStream (staleness, latest, opts) {
  129. log.verbose('all-package-metadata', 'creating remote entry stream')
  130. let partialUpdate = false
  131. let uri = '/-/all'
  132. if (latest && (Date.now() - latest < (staleness * 1000))) {
  133. // Skip the request altogether if our `latest` isn't stale.
  134. log.verbose('all-package-metadata', 'Local data up to date, skipping update')
  135. return BB.resolve({})
  136. } else if (latest === 0) {
  137. log.warn('', 'Building the local index for the first time, please be patient')
  138. log.verbose('all-package-metadata', 'No cached data: requesting full metadata db')
  139. } else {
  140. log.verbose('all-package-metadata', 'Cached data present with timestamp:', latest, 'requesting partial index update')
  141. uri += '/since?stale=update_after&startkey=' + latest
  142. partialUpdate = true
  143. }
  144. return npmFetch(uri, opts).then(res => {
  145. log.silly('all-package-metadata', 'request stream opened, code:', res.statusCode)
  146. let entryStream = ms.pipeline.obj(
  147. res.body,
  148. JSONStream.parse('*', (pkg, key) => {
  149. if (key[0] === '_updated' || key[0][0] !== '_') {
  150. return pkg
  151. }
  152. })
  153. )
  154. if (partialUpdate) {
  155. // The `/all/since` endpoint doesn't return `_updated`, so we
  156. // just use the request's own timestamp.
  157. return {
  158. updateStream: entryStream,
  159. updatedLatest: Date.parse(res.headers.get('date'))
  160. }
  161. } else {
  162. return extractUpdated(entryStream, 'entry-update-stream', opts)
  163. }
  164. })
  165. }
  166. // Both the (full) remote requests and the local index have `_updated` as their
  167. // first returned entries. This is the "latest" unix timestamp for the metadata
  168. // in question. This code does a bit of juggling with the data streams
  169. // so that we can pretend that field doesn't exist, but still extract `latest`
  170. function extractUpdated (entryStream, label, opts) {
  171. log.silly('all-package-metadata', 'extracting latest')
  172. return new BB((resolve, reject) => {
  173. function nope (msg) {
  174. return function () {
  175. log.warn('all-package-metadata', label, msg)
  176. entryStream.removeAllListeners()
  177. entryStream.destroy()
  178. reject(new Error(msg))
  179. }
  180. }
  181. const onErr = nope('Failed to read stream')
  182. const onEnd = nope('Empty or invalid stream')
  183. entryStream.on('error', onErr)
  184. entryStream.on('end', onEnd)
  185. entryStream.once('data', latest => {
  186. log.silly('all-package-metadata', 'got first stream entry for', label, latest)
  187. entryStream.removeListener('error', onErr)
  188. entryStream.removeListener('end', onEnd)
  189. if (typeof latest === 'number') {
  190. // The extra pipeline is to return a stream that will implicitly unpause
  191. // after having an `.on('data')` listener attached, since using this
  192. // `data` event broke its initial state.
  193. resolve({
  194. updateStream: entryStream.pipe(ms.through.obj()),
  195. updatedLatest: latest
  196. })
  197. } else {
  198. reject(new Error('expected first entry to be _updated'))
  199. }
  200. })
  201. })
  202. }
  203. // Creates a stream that writes input metadata to the current cache.
  204. // Cache updates are atomic, and the stream closes when *everything* is done.
  205. // The stream is also passthrough, so entries going through it will also
  206. // be output from it.
  207. module.exports._createCacheWriteStream = createCacheWriteStream
  208. function createCacheWriteStream (cacheFile, latest, opts) {
  209. return _ensureCacheDirExists(cacheFile, opts).then(({uid, gid}) => {
  210. log.silly('all-package-metadata', 'creating output stream')
  211. const outStream = _createCacheOutStream()
  212. const cacheFileStream = writeStreamAtomic(cacheFile)
  213. const inputStream = _createCacheInStream(
  214. cacheFileStream, outStream, latest
  215. )
  216. // Glue together the various streams so they fail together.
  217. // `cacheFileStream` errors are already handled by the `inputStream`
  218. // pipeline
  219. let errEmitted = false
  220. linkStreams(inputStream, outStream, () => { errEmitted = true })
  221. cacheFileStream.on('close', () => {
  222. if (!errEmitted) {
  223. if (typeof uid === 'number' &&
  224. typeof gid === 'number' &&
  225. process.getuid &&
  226. process.getgid &&
  227. (process.getuid() !== uid || process.getgid() !== gid)) {
  228. chownr.sync(cacheFile, uid, gid)
  229. }
  230. outStream.end()
  231. }
  232. })
  233. return ms.duplex.obj(inputStream, outStream)
  234. })
  235. }
  236. // return the {uid,gid} that the cache should have
  237. function _ensureCacheDirExists (cacheFile, opts) {
  238. var cacheBase = path.dirname(cacheFile)
  239. log.silly('all-package-metadata', 'making sure cache dir exists at', cacheBase)
  240. return correctMkdir(opts.cache).then(st => {
  241. return mkdir(cacheBase).then(made => {
  242. return chownr(made || cacheBase, st.uid, st.gid)
  243. }).then(() => ({ uid: st.uid, gid: st.gid }))
  244. })
  245. }
  246. function _createCacheOutStream () {
  247. // NOTE: this looks goofy, but it's necessary in order to get
  248. // JSONStream to play nice with the rest of everything.
  249. return ms.pipeline.obj(
  250. ms.through(),
  251. JSONStream.parse('*', (obj, key) => {
  252. // This stream happens to get _updated passed through it, for
  253. // implementation reasons. We make sure to filter it out cause
  254. // the fact that it comes t
  255. if (typeof obj === 'object') {
  256. return obj
  257. }
  258. }),
  259. ms.through.obj()
  260. )
  261. }
  262. function _createCacheInStream (writer, outStream, latest) {
  263. let updatedWritten = false
  264. const inStream = ms.pipeline.obj(
  265. ms.through.obj((pkg, enc, cb) => {
  266. if (!updatedWritten && typeof pkg === 'number') {
  267. // This is the `_updated` value getting sent through.
  268. updatedWritten = true
  269. return cb(null, ['_updated', pkg])
  270. } else if (typeof pkg !== 'object') {
  271. this.emit('error', new Error('invalid value written to input stream'))
  272. } else {
  273. // The [key, val] format is expected by `jsonstream` for object writing
  274. cb(null, [pkg.name, pkg])
  275. }
  276. }),
  277. JSONStream.stringifyObject('{', ',', '}'),
  278. ms.through((chunk, enc, cb) => {
  279. // This tees off the buffer data to `outStream`, and then continues
  280. // the pipeline as usual
  281. outStream.write(chunk, enc, () => cb(null, chunk))
  282. }),
  283. // And finally, we write to the cache file.
  284. writer
  285. )
  286. inStream.write(latest)
  287. return inStream
  288. }
  289. // Links errors between `a` and `b`, preventing cycles, and calls `cb` if
  290. // an error happens, once per error.
  291. function linkStreams (a, b, cb) {
  292. var lastError = null
  293. a.on('error', function (err) {
  294. if (err !== lastError) {
  295. lastError = err
  296. b.emit('error', err)
  297. cb && cb(err)
  298. }
  299. })
  300. b.on('error', function (err) {
  301. if (err !== lastError) {
  302. lastError = err
  303. a.emit('error', err)
  304. cb && cb(err)
  305. }
  306. })
  307. }