前面介绍了Readable与Writable的创建方法、使用方式,以及如何控制流中的数据类型,
其中多次提到“缓存”,这部分将探究一下这个“缓存”为何物,为什么会存在。
var readable = Readable({ highWaterMark: highWaterMark })
var state = readable._readableState先介绍两个字段:
state.buffer:Array,每个元素对应push(data)中的data(可能进行过编码,见前面的解释)。state.length:Number,整个缓存的长度。 如果是objectMode,与state.buffer.length是一样的; 否则是state.buffer中字节数的总和。
本文中一开始给出的大文件处理案例,要求不能将文件内容一次性全读进内存,
所以fs.createReadStream创建的Readable对象,底层会调用fs.read去多次从底层文件中将数据读出,
内存中存储的便是一次读取的量。
因此,每次读取的数据需要放入缓存中,等待被消耗。
那什么时候会从底层文件中去读取一次?
简单来说,每次执行readable.read()时,
如果state.length低于highWaterMark,
便会执行readable._read(highWaterMark)从底层读取数据存入缓存中。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
var data = source.shift() || null
console.log('buffer before push', this._readableState.buffer)
console.log('push', data)
this.push(data)
console.log('buffer after push', this._readableState.buffer)
console.log('--------------')
},
})
readable.on('data', function (data) {
console.log('consume', data)
})输出:
⌘ node example/highWaterMark.js
buffer before push []
push a
buffer after push [ <Buffer 61> ]
--------------
buffer before push [ <Buffer 61> ]
push b
buffer after push [ <Buffer 61>, <Buffer 62> ]
--------------
consume <Buffer 61>
buffer before push [ <Buffer 62> ]
push c
buffer after push [ <Buffer 62>, <Buffer 63> ]
--------------
consume <Buffer 62>
buffer before push [ <Buffer 63> ]
push null
buffer after push [ <Buffer 63> ]
--------------
consume <Buffer 63>
在监听data事件时,发生以下事情:
- 将回调放入事件队列中,与正常的事件监听无异
- 调用
read(0),进而引起_read的调用。 实际效果等同于state.buffer.push('a') - 调用
flow(),试图将缓存读空。 效果等同于while (read()) read()。 - 调用
read(0)。 由于已调用过push(null),所以会直接调用endReadable来结束流。
其中的flow环节,就是源源不断产生数据的环节。
每次调用chunk = read()时,先检查是否需要从底层读点数据到缓存中来
(当本次读取后,剩余的数据量小于highWaterMark时,便需要),
如果需要,就调用_read(highWaterMark)。
然后从state.buffer中取出一定的数据chunk。
objectMode或flowing模式时即为第一个元素。
如果chunk不为null,便emit('data', chunk)。
于是事件回调被执行,数据被消耗。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
var state = this._readableState
process.nextTick(function () {
var data = source.shift() || null
console.log('buffer before push', state.buffer)
console.log('push', data)
readable.push(data)
console.log('buffer after push', state.buffer)
console.log('- - - - - - - - - - - - - -')
})
},
})
readable.on('data', function (data) {
console.log('consume', data)
})输出:
⌘ node example/highWaterMark-async.js
buffer before push []
push a
consume <Buffer 61>
buffer after push []
--------------
buffer before push []
push b
consume <Buffer 62>
buffer after push []
--------------
buffer before push []
push c
consume <Buffer 63>
buffer after push []
--------------
buffer before push []
push null
buffer after push []
--------------
对于在_read中异步调用push而言,只要push前state.buffer为空,
便可确定当前的数据即是下一个要求的数据,所以会直接emit('data', data),
因而,也便不会再写入缓存。
当然,只是这个简单的例子如此而已。
在emit('data')后,会立即调用read(0),触发下一次的_read调用。
于是,数据便源源不断的产生,直到push(null)。
var writable = Writable({ highWaterMark: highWaterMark })
var state = writable._writableState前面解释了Readable中highWaterMark的作用:
控制底层读取的速度。
Writable中highWaterMark的作用也是控制速度:
当state.length大于highWaterMark时,write(data)会返回false,
上游可以判断这个返回值,停止往writable中写数据,
同时监听drain事件触发再继续写。
Writable的缓存实际是一个待写入数据队列,
state.length也就是这个队列的长度。
每次底层的写操作完成时,检查state.length,
如果为0,则有可能触发drain事件。
这个“有可能”,便是之前出现了state.length大于highWaterMark的情况,
外面还在等待drain事件。