什么是处理node.js变换流的背压的正确方法?

 campionezhao 发布于 2023-02-08 11:38

我认为Transform这适合于此,但我会将膨胀作为管道中的一个单独步骤.

这是一个快速且基本未经测试的例子:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));


小智.. 5

push如果要写入的流(在本例中为文件输出流)缓冲了太多数据,则返回false.由于您正在写入磁盘,因此这是有道理的:您处理数据的速度比写出来的速度快.

out缓冲区已满时,您的转换流将无法推送,并开始缓冲数据本身.如果该缓冲区应该填充,那么inp将开始填充.这就是事情应该如何运作.管道流只会像链中最慢的链接一样快地处理数据(一旦缓冲区已满).

2 个回答
  • 我认为Transform这适合于此,但我会将膨胀作为管道中的一个单独步骤.

    这是一个快速且基本未经测试的例子:

    var zlib        = require('zlib');
    var stream      = require('stream');
    var transformer = new stream.Transform();
    
    // Properties used to keep internal state of transformer.
    transformer._buffers    = [];
    transformer._inputSize  = 0;
    transformer._targetSize = 1024 * 38;
    
    // Dump one 'output packet'
    transformer._dump       = function(done) {
      // concatenate buffers and convert to binary string
      var buffer = Buffer.concat(this._buffers).toString('binary');
    
      // Take first 1024 packets.
      var packetBuffer = buffer.substring(0, this._targetSize);
    
      // Keep the rest and reset counter.
      this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
      this._inputSize = this._buffers[0].length;
    
      // output header
      this.push('HELLO WORLD');
    
      // output compressed packet buffer
      zlib.deflate(packetBuffer, function(err, compressed) {
        // TODO: handle `err`
        this.push(compressed);
        if (done) {
          done();
        }
      }.bind(this));
    };
    
    // Main transformer logic: buffer chunks and dump them once the
    // target size has been met.
    transformer._transform  = function(chunk, encoding, done) {
      this._buffers.push(chunk);
      this._inputSize += chunk.length;
    
      if (this._inputSize >= this._targetSize) {
        this._dump(done);
      } else {
        done();
      }
    };
    
    // Flush any remaining buffers.
    transformer._flush = function() {
      this._dump();
    };
    
    // Example:
    var fs = require('fs');
    fs.createReadStream('depth_1000000')
      .pipe(zlib.createInflate())
      .pipe(transformer)
      .pipe(fs.createWriteStream('depth_1000000.out'));
    

    2023-02-08 11:40 回答
  • push如果要写入的流(在本例中为文件输出流)缓冲了太多数据,则返回false.由于您正在写入磁盘,因此这是有道理的:您处理数据的速度比写出来的速度快.

    out缓冲区已满时,您的转换流将无法推送,并开始缓冲数据本身.如果该缓冲区应该填充,那么inp将开始填充.这就是事情应该如何运作.管道流只会像链中最慢的链接一样快地处理数据(一旦缓冲区已满).

    2023-02-08 11:40 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有