index.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. var through = require('through');
  2. var bz2 = require('./lib/bzip2');
  3. var bitIterator = require('./lib/bit_iterator');
  4. module.exports = unbzip2Stream;
  5. function unbzip2Stream() {
  6. var bufferQueue = [];
  7. var hasBytes = 0;
  8. var blockSize = 0;
  9. var broken = false;
  10. var done = false;
  11. var bitReader = null;
  12. var streamCRC = null;
  13. function decompressBlock(push){
  14. if(!blockSize){
  15. blockSize = bz2.header(bitReader);
  16. //console.error("got header of", blockSize);
  17. streamCRC = 0;
  18. return true;
  19. }else{
  20. var bufsize = 100000 * blockSize;
  21. var buf = new Int32Array(bufsize);
  22. var chunk = [];
  23. var f = function(b) {
  24. chunk.push(b);
  25. };
  26. streamCRC = bz2.decompress(bitReader, f, buf, bufsize, streamCRC);
  27. if (streamCRC === null) {
  28. // reset for next bzip2 header
  29. blockSize = 0;
  30. return false;
  31. }else{
  32. //console.error('decompressed', chunk.length,'bytes');
  33. push(Buffer.from(chunk));
  34. return true;
  35. }
  36. }
  37. }
  38. var outlength = 0;
  39. function decompressAndQueue(stream) {
  40. if (broken) return;
  41. try {
  42. return decompressBlock(function(d) {
  43. stream.queue(d);
  44. if (d !== null) {
  45. //console.error('write at', outlength.toString(16));
  46. outlength += d.length;
  47. } else {
  48. //console.error('written EOS');
  49. }
  50. });
  51. } catch(e) {
  52. //console.error(e);
  53. stream.emit('error', e);
  54. broken = true;
  55. return false;
  56. }
  57. }
  58. return through(
  59. function write(data) {
  60. //console.error('received', data.length,'bytes in', typeof data);
  61. bufferQueue.push(data);
  62. hasBytes += data.length;
  63. if (bitReader === null) {
  64. bitReader = bitIterator(function() {
  65. return bufferQueue.shift();
  66. });
  67. }
  68. while (!broken && hasBytes - bitReader.bytesRead + 1 >= ((25000 + 100000 * blockSize) || 4)){
  69. //console.error('decompressing with', hasBytes - bitReader.bytesRead + 1, 'bytes in buffer');
  70. decompressAndQueue(this);
  71. }
  72. },
  73. function end(x) {
  74. //console.error(x,'last compressing with', hasBytes, 'bytes in buffer');
  75. while (!broken && bitReader && hasBytes > bitReader.bytesRead){
  76. decompressAndQueue(this);
  77. }
  78. if (!broken) {
  79. if (streamCRC !== null)
  80. this.emit('error', new Error("input stream ended prematurely"));
  81. this.queue(null);
  82. }
  83. }
  84. );
  85. }