readable_streambuffer.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. var stream = require("stream"),
  2. constants = require("./constants"),
  3. util = require("util");
  4. var ReadableStreamBuffer = module.exports = function(opts) {
  5. var that = this;
  6. stream.Stream.call(this);
  7. opts = opts || {};
  8. var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY;
  9. var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE;
  10. var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE;
  11. var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT;
  12. var size = 0;
  13. var buffer = new Buffer(initialSize);
  14. var encoding = null;
  15. this.readable = true;
  16. this.writable = false;
  17. var sendData = function() {
  18. if(!size) {
  19. that.emit("end");
  20. return;
  21. }
  22. var amount = Math.min(chunkSize, size);
  23. var chunk = null;
  24. if(encoding) {
  25. chunk = buffer.toString(encoding, 0, amount);
  26. }
  27. else {
  28. chunk = new Buffer(amount);
  29. buffer.copy(chunk, 0, 0, amount);
  30. }
  31. that.emit("data", chunk);
  32. if(amount < buffer.length)
  33. buffer.copy(buffer, 0, amount, size);
  34. size -= amount;
  35. };
  36. this.size = function() {
  37. return size;
  38. };
  39. this.maxSize = function() {
  40. return buffer.length;
  41. };
  42. var increaseBufferIfNecessary = function(incomingDataSize) {
  43. if((buffer.length - size) < incomingDataSize) {
  44. var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount);
  45. var newBuffer = new Buffer(buffer.length + (incrementAmount * factor));
  46. buffer.copy(newBuffer, 0, 0, size);
  47. buffer = newBuffer;
  48. }
  49. };
  50. this.put = function(data, encoding) {
  51. if(!that.readable) return;
  52. if(Buffer.isBuffer(data)) {
  53. increaseBufferIfNecessary(data.length);
  54. data.copy(buffer, size, 0);
  55. size += data.length;
  56. }
  57. else {
  58. data = data + "";
  59. var dataSizeInBytes = Buffer.byteLength(data);
  60. increaseBufferIfNecessary(dataSizeInBytes);
  61. buffer.write(data, size, encoding || "utf8");
  62. size += dataSizeInBytes;
  63. }
  64. if (!this.isPaused && !frequency) {
  65. while (size > 0) {
  66. sendData();
  67. }
  68. }
  69. };
  70. this.pause = function() {
  71. this.isPaused = true;
  72. if(sendData && sendData.interval) {
  73. clearInterval(sendData.interval);
  74. delete sendData.interval;
  75. }
  76. };
  77. this.resume = function() {
  78. this.isPaused = false;
  79. if(sendData && !sendData.interval && frequency > 0) {
  80. sendData.interval = setInterval(sendData, frequency);
  81. }
  82. };
  83. this.destroy = function() {
  84. that.emit("end");
  85. if(sendData.interval) clearTimeout(sendData.interval);
  86. sendData = null;
  87. that.readable = false;
  88. that.emit("close");
  89. };
  90. this.setEncoding = function(_encoding) {
  91. encoding = _encoding;
  92. };
  93. this.resume();
  94. };
  95. util.inherits(ReadableStreamBuffer, stream.Stream);