FileMiddleware.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. */
  4. "use strict";
  5. const { constants } = require("buffer");
  6. const { pipeline } = require("stream");
  7. const {
  8. createBrotliCompress,
  9. createBrotliDecompress,
  10. createGzip,
  11. createGunzip,
  12. constants: zConstants
  13. } = require("zlib");
  14. const createHash = require("../util/createHash");
  15. const { dirname, join, mkdirp } = require("../util/fs");
  16. const memoize = require("../util/memoize");
  17. const SerializerMiddleware = require("./SerializerMiddleware");
  18. /** @typedef {typeof import("../util/Hash")} Hash */
  19. /** @typedef {import("../util/fs").IntermediateFileSystem} IntermediateFileSystem */
  20. /** @typedef {import("./types").BufferSerializableType} BufferSerializableType */
  21. /*
  22. Format:
  23. File -> Header Section*
  24. Version -> u32
  25. AmountOfSections -> u32
  26. SectionSize -> i32 (if less than zero represents lazy value)
  27. Header -> Version AmountOfSections SectionSize*
  28. Buffer -> n bytes
  29. Section -> Buffer
  30. */
  31. // "wpc" + 1 in little-endian
  32. const VERSION = 0x01637077;
  33. const WRITE_LIMIT_TOTAL = 0x7fff0000;
  34. const WRITE_LIMIT_CHUNK = 511 * 1024 * 1024;
  35. /**
  36. * @param {Buffer[]} buffers buffers
  37. * @param {string | Hash} hashFunction hash function to use
  38. * @returns {string} hash
  39. */
  40. const hashForName = (buffers, hashFunction) => {
  41. const hash = createHash(hashFunction);
  42. for (const buf of buffers) hash.update(buf);
  43. return /** @type {string} */ (hash.digest("hex"));
  44. };
  45. const COMPRESSION_CHUNK_SIZE = 100 * 1024 * 1024;
  46. const DECOMPRESSION_CHUNK_SIZE = 100 * 1024 * 1024;
  47. const writeUInt64LE = Buffer.prototype.writeBigUInt64LE
  48. ? (buf, value, offset) => {
  49. buf.writeBigUInt64LE(BigInt(value), offset);
  50. }
  51. : (buf, value, offset) => {
  52. const low = value % 0x100000000;
  53. const high = (value - low) / 0x100000000;
  54. buf.writeUInt32LE(low, offset);
  55. buf.writeUInt32LE(high, offset + 4);
  56. };
  57. const readUInt64LE = Buffer.prototype.readBigUInt64LE
  58. ? (buf, offset) => {
  59. return Number(buf.readBigUInt64LE(offset));
  60. }
  61. : (buf, offset) => {
  62. const low = buf.readUInt32LE(offset);
  63. const high = buf.readUInt32LE(offset + 4);
  64. return high * 0x100000000 + low;
  65. };
  66. /**
  67. * @typedef {Object} SerializeResult
  68. * @property {string | false} name
  69. * @property {number} size
  70. * @property {Promise=} backgroundJob
  71. */
  72. /**
  73. * @param {FileMiddleware} middleware this
  74. * @param {BufferSerializableType[] | Promise<BufferSerializableType[]>} data data to be serialized
  75. * @param {string | boolean} name file base name
  76. * @param {function(string | false, Buffer[], number): Promise<void>} writeFile writes a file
  77. * @param {string | Hash} hashFunction hash function to use
  78. * @returns {Promise<SerializeResult>} resulting file pointer and promise
  79. */
  80. const serialize = async (
  81. middleware,
  82. data,
  83. name,
  84. writeFile,
  85. hashFunction = "md4"
  86. ) => {
  87. /** @type {(Buffer[] | Buffer | SerializeResult | Promise<SerializeResult>)[]} */
  88. const processedData = [];
  89. /** @type {WeakMap<SerializeResult, function(): any | Promise<any>>} */
  90. const resultToLazy = new WeakMap();
  91. /** @type {Buffer[]} */
  92. let lastBuffers = undefined;
  93. for (const item of await data) {
  94. if (typeof item === "function") {
  95. if (!SerializerMiddleware.isLazy(item))
  96. throw new Error("Unexpected function");
  97. if (!SerializerMiddleware.isLazy(item, middleware)) {
  98. throw new Error(
  99. "Unexpected lazy value with non-this target (can't pass through lazy values)"
  100. );
  101. }
  102. lastBuffers = undefined;
  103. const serializedInfo = SerializerMiddleware.getLazySerializedValue(item);
  104. if (serializedInfo) {
  105. if (typeof serializedInfo === "function") {
  106. throw new Error(
  107. "Unexpected lazy value with non-this target (can't pass through lazy values)"
  108. );
  109. } else {
  110. processedData.push(serializedInfo);
  111. }
  112. } else {
  113. const content = item();
  114. if (content) {
  115. const options = SerializerMiddleware.getLazyOptions(item);
  116. processedData.push(
  117. serialize(
  118. middleware,
  119. content,
  120. (options && options.name) || true,
  121. writeFile,
  122. hashFunction
  123. ).then(result => {
  124. /** @type {any} */ (item).options.size = result.size;
  125. resultToLazy.set(result, item);
  126. return result;
  127. })
  128. );
  129. } else {
  130. throw new Error(
  131. "Unexpected falsy value returned by lazy value function"
  132. );
  133. }
  134. }
  135. } else if (item) {
  136. if (lastBuffers) {
  137. lastBuffers.push(item);
  138. } else {
  139. lastBuffers = [item];
  140. processedData.push(lastBuffers);
  141. }
  142. } else {
  143. throw new Error("Unexpected falsy value in items array");
  144. }
  145. }
  146. /** @type {Promise<any>[]} */
  147. const backgroundJobs = [];
  148. const resolvedData = (
  149. await Promise.all(
  150. /** @type {Promise<Buffer[] | Buffer | SerializeResult>[]} */ (
  151. processedData
  152. )
  153. )
  154. ).map(item => {
  155. if (Array.isArray(item) || Buffer.isBuffer(item)) return item;
  156. backgroundJobs.push(item.backgroundJob);
  157. // create pointer buffer from size and name
  158. const name = /** @type {string} */ (item.name);
  159. const nameBuffer = Buffer.from(name);
  160. const buf = Buffer.allocUnsafe(8 + nameBuffer.length);
  161. writeUInt64LE(buf, item.size, 0);
  162. nameBuffer.copy(buf, 8, 0);
  163. const lazy = resultToLazy.get(item);
  164. SerializerMiddleware.setLazySerializedValue(lazy, buf);
  165. return buf;
  166. });
  167. const lengths = [];
  168. for (const item of resolvedData) {
  169. if (Array.isArray(item)) {
  170. let l = 0;
  171. for (const b of item) l += b.length;
  172. while (l > 0x7fffffff) {
  173. lengths.push(0x7fffffff);
  174. l -= 0x7fffffff;
  175. }
  176. lengths.push(l);
  177. } else if (item) {
  178. lengths.push(-item.length);
  179. } else {
  180. throw new Error("Unexpected falsy value in resolved data " + item);
  181. }
  182. }
  183. const header = Buffer.allocUnsafe(8 + lengths.length * 4);
  184. header.writeUInt32LE(VERSION, 0);
  185. header.writeUInt32LE(lengths.length, 4);
  186. for (let i = 0; i < lengths.length; i++) {
  187. header.writeInt32LE(lengths[i], 8 + i * 4);
  188. }
  189. const buf = [header];
  190. for (const item of resolvedData) {
  191. if (Array.isArray(item)) {
  192. for (const b of item) buf.push(b);
  193. } else if (item) {
  194. buf.push(item);
  195. }
  196. }
  197. if (name === true) {
  198. name = hashForName(buf, hashFunction);
  199. }
  200. let size = 0;
  201. for (const b of buf) size += b.length;
  202. backgroundJobs.push(writeFile(name, buf, size));
  203. return {
  204. size,
  205. name,
  206. backgroundJob:
  207. backgroundJobs.length === 1
  208. ? backgroundJobs[0]
  209. : Promise.all(backgroundJobs)
  210. };
  211. };
  212. /**
  213. * @param {FileMiddleware} middleware this
  214. * @param {string | false} name filename
  215. * @param {function(string | false): Promise<Buffer[]>} readFile read content of a file
  216. * @returns {Promise<BufferSerializableType[]>} deserialized data
  217. */
  218. const deserialize = async (middleware, name, readFile) => {
  219. const contents = await readFile(name);
  220. if (contents.length === 0) throw new Error("Empty file " + name);
  221. let contentsIndex = 0;
  222. let contentItem = contents[0];
  223. let contentItemLength = contentItem.length;
  224. let contentPosition = 0;
  225. if (contentItemLength === 0) throw new Error("Empty file " + name);
  226. const nextContent = () => {
  227. contentsIndex++;
  228. contentItem = contents[contentsIndex];
  229. contentItemLength = contentItem.length;
  230. contentPosition = 0;
  231. };
  232. const ensureData = n => {
  233. if (contentPosition === contentItemLength) {
  234. nextContent();
  235. }
  236. while (contentItemLength - contentPosition < n) {
  237. const remaining = contentItem.slice(contentPosition);
  238. let lengthFromNext = n - remaining.length;
  239. const buffers = [remaining];
  240. for (let i = contentsIndex + 1; i < contents.length; i++) {
  241. const l = contents[i].length;
  242. if (l > lengthFromNext) {
  243. buffers.push(contents[i].slice(0, lengthFromNext));
  244. contents[i] = contents[i].slice(lengthFromNext);
  245. lengthFromNext = 0;
  246. break;
  247. } else {
  248. buffers.push(contents[i]);
  249. contentsIndex = i;
  250. lengthFromNext -= l;
  251. }
  252. }
  253. if (lengthFromNext > 0) throw new Error("Unexpected end of data");
  254. contentItem = Buffer.concat(buffers, n);
  255. contentItemLength = n;
  256. contentPosition = 0;
  257. }
  258. };
  259. const readUInt32LE = () => {
  260. ensureData(4);
  261. const value = contentItem.readUInt32LE(contentPosition);
  262. contentPosition += 4;
  263. return value;
  264. };
  265. const readInt32LE = () => {
  266. ensureData(4);
  267. const value = contentItem.readInt32LE(contentPosition);
  268. contentPosition += 4;
  269. return value;
  270. };
  271. const readSlice = l => {
  272. ensureData(l);
  273. if (contentPosition === 0 && contentItemLength === l) {
  274. const result = contentItem;
  275. if (contentsIndex + 1 < contents.length) {
  276. nextContent();
  277. } else {
  278. contentPosition = l;
  279. }
  280. return result;
  281. }
  282. const result = contentItem.slice(contentPosition, contentPosition + l);
  283. contentPosition += l;
  284. // we clone the buffer here to allow the original content to be garbage collected
  285. return l * 2 < contentItem.buffer.byteLength ? Buffer.from(result) : result;
  286. };
  287. const version = readUInt32LE();
  288. if (version !== VERSION) {
  289. throw new Error("Invalid file version");
  290. }
  291. const sectionCount = readUInt32LE();
  292. const lengths = [];
  293. let lastLengthPositive = false;
  294. for (let i = 0; i < sectionCount; i++) {
  295. const value = readInt32LE();
  296. const valuePositive = value >= 0;
  297. if (lastLengthPositive && valuePositive) {
  298. lengths[lengths.length - 1] += value;
  299. } else {
  300. lengths.push(value);
  301. lastLengthPositive = valuePositive;
  302. }
  303. }
  304. const result = [];
  305. for (let length of lengths) {
  306. if (length < 0) {
  307. const slice = readSlice(-length);
  308. const size = Number(readUInt64LE(slice, 0));
  309. const nameBuffer = slice.slice(8);
  310. const name = nameBuffer.toString();
  311. result.push(
  312. SerializerMiddleware.createLazy(
  313. memoize(() => deserialize(middleware, name, readFile)),
  314. middleware,
  315. {
  316. name,
  317. size
  318. },
  319. slice
  320. )
  321. );
  322. } else {
  323. if (contentPosition === contentItemLength) {
  324. nextContent();
  325. } else if (contentPosition !== 0) {
  326. if (length <= contentItemLength - contentPosition) {
  327. result.push(
  328. Buffer.from(
  329. contentItem.buffer,
  330. contentItem.byteOffset + contentPosition,
  331. length
  332. )
  333. );
  334. contentPosition += length;
  335. length = 0;
  336. } else {
  337. const l = contentItemLength - contentPosition;
  338. result.push(
  339. Buffer.from(
  340. contentItem.buffer,
  341. contentItem.byteOffset + contentPosition,
  342. l
  343. )
  344. );
  345. length -= l;
  346. contentPosition = contentItemLength;
  347. }
  348. } else {
  349. if (length >= contentItemLength) {
  350. result.push(contentItem);
  351. length -= contentItemLength;
  352. contentPosition = contentItemLength;
  353. } else {
  354. result.push(
  355. Buffer.from(contentItem.buffer, contentItem.byteOffset, length)
  356. );
  357. contentPosition += length;
  358. length = 0;
  359. }
  360. }
  361. while (length > 0) {
  362. nextContent();
  363. if (length >= contentItemLength) {
  364. result.push(contentItem);
  365. length -= contentItemLength;
  366. contentPosition = contentItemLength;
  367. } else {
  368. result.push(
  369. Buffer.from(contentItem.buffer, contentItem.byteOffset, length)
  370. );
  371. contentPosition += length;
  372. length = 0;
  373. }
  374. }
  375. }
  376. }
  377. return result;
  378. };
  379. /**
  380. * @typedef {BufferSerializableType[]} DeserializedType
  381. * @typedef {true} SerializedType
  382. * @extends {SerializerMiddleware<DeserializedType, SerializedType>}
  383. */
  384. class FileMiddleware extends SerializerMiddleware {
  385. /**
  386. * @param {IntermediateFileSystem} fs filesystem
  387. * @param {string | Hash} hashFunction hash function to use
  388. */
  389. constructor(fs, hashFunction = "md4") {
  390. super();
  391. this.fs = fs;
  392. this._hashFunction = hashFunction;
  393. }
  394. /**
  395. * @param {DeserializedType} data data
  396. * @param {Object} context context object
  397. * @returns {SerializedType|Promise<SerializedType>} serialized data
  398. */
  399. serialize(data, context) {
  400. const { filename, extension = "" } = context;
  401. return new Promise((resolve, reject) => {
  402. mkdirp(this.fs, dirname(this.fs, filename), err => {
  403. if (err) return reject(err);
  404. // It's important that we don't touch existing files during serialization
  405. // because serialize may read existing files (when deserializing)
  406. const allWrittenFiles = new Set();
  407. const writeFile = async (name, content, size) => {
  408. const file = name
  409. ? join(this.fs, filename, `../${name}${extension}`)
  410. : filename;
  411. await new Promise((resolve, reject) => {
  412. let stream = this.fs.createWriteStream(file + "_");
  413. let compression;
  414. if (file.endsWith(".gz")) {
  415. compression = createGzip({
  416. chunkSize: COMPRESSION_CHUNK_SIZE,
  417. level: zConstants.Z_BEST_SPEED
  418. });
  419. } else if (file.endsWith(".br")) {
  420. compression = createBrotliCompress({
  421. chunkSize: COMPRESSION_CHUNK_SIZE,
  422. params: {
  423. [zConstants.BROTLI_PARAM_MODE]: zConstants.BROTLI_MODE_TEXT,
  424. [zConstants.BROTLI_PARAM_QUALITY]: 2,
  425. [zConstants.BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING]: true,
  426. [zConstants.BROTLI_PARAM_SIZE_HINT]: size
  427. }
  428. });
  429. }
  430. if (compression) {
  431. pipeline(compression, stream, reject);
  432. stream = compression;
  433. stream.on("finish", () => resolve());
  434. } else {
  435. stream.on("error", err => reject(err));
  436. stream.on("finish", () => resolve());
  437. }
  438. // split into chunks for WRITE_LIMIT_CHUNK size
  439. const chunks = [];
  440. for (const b of content) {
  441. if (b.length < WRITE_LIMIT_CHUNK) {
  442. chunks.push(b);
  443. } else {
  444. for (let i = 0; i < b.length; i += WRITE_LIMIT_CHUNK) {
  445. chunks.push(b.slice(i, i + WRITE_LIMIT_CHUNK));
  446. }
  447. }
  448. }
  449. const len = chunks.length;
  450. let i = 0;
  451. const batchWrite = err => {
  452. // will be handled in "on" error handler
  453. if (err) return;
  454. if (i === len) {
  455. stream.end();
  456. return;
  457. }
  458. // queue up a batch of chunks up to the write limit
  459. // end is exclusive
  460. let end = i;
  461. let sum = chunks[end++].length;
  462. while (end < len) {
  463. sum += chunks[end].length;
  464. if (sum > WRITE_LIMIT_TOTAL) break;
  465. end++;
  466. }
  467. while (i < end - 1) {
  468. stream.write(chunks[i++]);
  469. }
  470. stream.write(chunks[i++], batchWrite);
  471. };
  472. batchWrite();
  473. });
  474. if (name) allWrittenFiles.add(file);
  475. };
  476. resolve(
  477. serialize(this, data, false, writeFile, this._hashFunction).then(
  478. async ({ backgroundJob }) => {
  479. await backgroundJob;
  480. // Rename the index file to disallow access during inconsistent file state
  481. await new Promise(resolve =>
  482. this.fs.rename(filename, filename + ".old", err => {
  483. resolve();
  484. })
  485. );
  486. // update all written files
  487. await Promise.all(
  488. Array.from(
  489. allWrittenFiles,
  490. file =>
  491. new Promise((resolve, reject) => {
  492. this.fs.rename(file + "_", file, err => {
  493. if (err) return reject(err);
  494. resolve();
  495. });
  496. })
  497. )
  498. );
  499. // As final step automatically update the index file to have a consistent pack again
  500. await new Promise(resolve => {
  501. this.fs.rename(filename + "_", filename, err => {
  502. if (err) return reject(err);
  503. resolve();
  504. });
  505. });
  506. return /** @type {true} */ (true);
  507. }
  508. )
  509. );
  510. });
  511. });
  512. }
  513. /**
  514. * @param {SerializedType} data data
  515. * @param {Object} context context object
  516. * @returns {DeserializedType|Promise<DeserializedType>} deserialized data
  517. */
  518. deserialize(data, context) {
  519. const { filename, extension = "" } = context;
  520. const readFile = name =>
  521. new Promise((resolve, reject) => {
  522. const file = name
  523. ? join(this.fs, filename, `../${name}${extension}`)
  524. : filename;
  525. this.fs.stat(file, (err, stats) => {
  526. if (err) {
  527. reject(err);
  528. return;
  529. }
  530. let remaining = /** @type {number} */ (stats.size);
  531. let currentBuffer;
  532. let currentBufferUsed;
  533. const buf = [];
  534. let decompression;
  535. if (file.endsWith(".gz")) {
  536. decompression = createGunzip({
  537. chunkSize: DECOMPRESSION_CHUNK_SIZE
  538. });
  539. } else if (file.endsWith(".br")) {
  540. decompression = createBrotliDecompress({
  541. chunkSize: DECOMPRESSION_CHUNK_SIZE
  542. });
  543. }
  544. if (decompression) {
  545. let newResolve, newReject;
  546. resolve(
  547. Promise.all([
  548. new Promise((rs, rj) => {
  549. newResolve = rs;
  550. newReject = rj;
  551. }),
  552. new Promise((resolve, reject) => {
  553. decompression.on("data", chunk => buf.push(chunk));
  554. decompression.on("end", () => resolve());
  555. decompression.on("error", err => reject(err));
  556. })
  557. ]).then(() => buf)
  558. );
  559. resolve = newResolve;
  560. reject = newReject;
  561. }
  562. this.fs.open(file, "r", (err, fd) => {
  563. if (err) {
  564. reject(err);
  565. return;
  566. }
  567. const read = () => {
  568. if (currentBuffer === undefined) {
  569. currentBuffer = Buffer.allocUnsafeSlow(
  570. Math.min(
  571. constants.MAX_LENGTH,
  572. remaining,
  573. decompression ? DECOMPRESSION_CHUNK_SIZE : Infinity
  574. )
  575. );
  576. currentBufferUsed = 0;
  577. }
  578. let readBuffer = currentBuffer;
  579. let readOffset = currentBufferUsed;
  580. let readLength = currentBuffer.length - currentBufferUsed;
  581. // values passed to fs.read must be valid int32 values
  582. if (readOffset > 0x7fffffff) {
  583. readBuffer = currentBuffer.slice(readOffset);
  584. readOffset = 0;
  585. }
  586. if (readLength > 0x7fffffff) {
  587. readLength = 0x7fffffff;
  588. }
  589. this.fs.read(
  590. fd,
  591. readBuffer,
  592. readOffset,
  593. readLength,
  594. null,
  595. (err, bytesRead) => {
  596. if (err) {
  597. this.fs.close(fd, () => {
  598. reject(err);
  599. });
  600. return;
  601. }
  602. currentBufferUsed += bytesRead;
  603. remaining -= bytesRead;
  604. if (currentBufferUsed === currentBuffer.length) {
  605. if (decompression) {
  606. decompression.write(currentBuffer);
  607. } else {
  608. buf.push(currentBuffer);
  609. }
  610. currentBuffer = undefined;
  611. if (remaining === 0) {
  612. if (decompression) {
  613. decompression.end();
  614. }
  615. this.fs.close(fd, err => {
  616. if (err) {
  617. reject(err);
  618. return;
  619. }
  620. resolve(buf);
  621. });
  622. return;
  623. }
  624. }
  625. read();
  626. }
  627. );
  628. };
  629. read();
  630. });
  631. });
  632. });
  633. return deserialize(this, false, readFile);
  634. }
  635. }
  636. module.exports = FileMiddleware;