我正在尝试使用新的Node.js流API实现一个流,它将缓冲一定数量的数据.当此流通过管道传输到另一个流时,或者某些内容消耗了readable
事件时,此流应刷新其缓冲区,然后简单地成为传递.问题是,此流将通过管道传输到许多其他流,并且当连接每个目标流时,即使缓冲区已经刷新到另一个流,也必须刷新缓冲区.
例如:
BufferStream
实现stream.Transform
,并保留512KB内部环缓冲区
ReadableStreamA
通过管道输送到一个实例 BufferStream
BufferStream
写入其环形缓冲区,从中读取数据ReadableStreamA
.(数据丢失无关紧要,因为缓冲区会覆盖旧数据.)
BufferStream
是通过管道输送的 WritableStreamB
WritableStreamB
接收整个512KB缓冲区,并在从中ReadableStreamA
通过写入时继续获取数据BufferStream
.
BufferStream
是通过管道输送的 WritableStreamC
WritableStreamC
还接收整个512KB缓冲区,但此缓冲区现在与WritableStreamB
收到的缓冲区不同,因为此后已写入更多数据BufferStream
.
流API是否可以实现?我能想到的唯一方法是创建一个带有方法的对象,该方法为每个目标创建一个新的PassThrough流,这意味着我不能简单地管道输入和输出.
对于它的价值,我通过简单地监听data
事件的新处理程序,使用旧的"流动"API完成了这项工作.当附加一个新函数时.on('data')
,我会直接使用环形缓冲区的副本来调用它.
这是我对你的问题的看法.
基本思想是创建一个Transform
流,这将允许我们在流输出上发送数据之前执行自定义缓冲逻辑:
var util = require('util') var stream = require('stream') var BufferStream = function (streamOptions) { stream.Transform.call(this, streamOptions) this.buffer = new Buffer('') } util.inherits(BufferStream, stream.Transform) BufferStream.prototype._transform = function (chunk, encoding, done) { // custom buffering logic // ie. add chunk to this.buffer, check buffer size, etc. this.buffer = new Buffer(chunk) this.push(chunk) done() }
然后,我们需要覆盖该.pipe()
方法,以便在通过BufferStream
管道输入流时通知我们,这允许我们自动向其写入数据:
BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.call(this, destination, options) res.write(this.buffer) return res }
这样,当我们写入时buffer.pipe(someStream)
,我们按预期执行管道并将内部缓冲区写入输出流.在那之后,Transform
班级负责处理所有事情,同时跟踪背压和诸如此类的事情.
这是一个工作要点.请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易修复.