nodejs的Stream
Node.js 中处理流式数据的抽象接口。 stream 模块提供了用于实现流接口的对象。
Node.js stream 类型
从程序角度而言流是有方向的数据,以程序为第一视角,按照流动方向可以分为三种流
- 设备流向程序:readable
- 程序流向设备:writable
- 双向流动:duplex、transform
NodeJS 关于流的操作被封装到了 Stream 模块,这个模块也被多个核心模块所引用。按照 Unix 的哲学:一切皆文件
- 普通文件(txt、jpg、mp4)
- 设备文件(stdin、stdout)
- 网络文件(http、net)
在 Node.js 中对文件的处理多数使用流来完成
例子
假设写程序某个功能需要读取某个配置文件config.json
,这时候简单分析一下
-
数据:
config.json
的内容 - 方向:设备(物理磁盘文件) -> NodeJS 程序
应该使用 readable 流来做此事(后面章节会具体介绍 API)
const fs = require('fs'); const FILEPATH = '...'; const rs = fs.createReadStream(FILEPATH);
通过fs
模块提供的createReadStream()
方法创建了一个可读的流,
这时候_config.json_
的内容从设备流向程序。示例并没有直接使用Stream
模块,
因为fs
内部已经引用了Stream
模块,并做了封装。
读取到数据后可以对数据进行处理,比如需要写到某个路径 DEST ,这时候需要一个 writable 的流,让数据从程序流向设备
const ws = fs.createWriteStream(DEST);
两种流都有了,也就是两个数据加工器,那么如何通过类似 Unix 的管道符|
来连接流呢
?在 Node.js 中管道符号就是pipe()
方法
const fs = require('fs'); const FILEPATH = 'DEST_PATH_IN_YOUR_DISK'; const rs = fs.createReadStream(FILEPATH); const ws = fs.createWriteStream(DEST); rs.pipe(ws);
这样利用流实现了简单的文件复制功能,关于pipe()
方法的实现原理后面会提到,
但有个值得注意地方:数据必须是从上游 pipe 到下游,也就是从一个 readable
流 pipe 到 writable 流
加工一下数据
上面提到了 readable 和 writable 的流称之为加工器,其实并不太恰当, 因为过程中并没有加工什么,只是读取数据,然后存储数据
如果有个需求,把本地一个package.json
文件中的所有字母都改为小写,
并保存到同目录下的_package-lower.json_
文件下
这时候就需要用到双向的流了,假定有一个专门处理字符转小写的流 lowercase , 那么代码写出来大概是这样的
const fs = require('fs'); const rs = fs.createReadStream('./package.json'); const ws = fs.createWriteStream('./package-lower.json'); rs.pipe(lowercase).pipe(ws);
到这里就和清楚为什么称pipe()
连接的流为加工器了,根据上面说的,
必须从一个 readable 流 pipe 到 writable 流:
- rs -> lowercase:lowercase 在下游,所以 lower 需要是个 writable 流
- lowercase -> ws:相对而言,lowercase 又在上游,所以 lower 需要是个 readable 流
因此能够满足需求的 lowercase 必须是双向的流,具体使用 duplex 还是 transform 后面章节会提到。当然如果需求还有额外一些处理动作,比如字母还需要转成 ASCII 码, 假定有一个双工流 ascii,那么代码简单改写
rs.pipe(lowercase).pipe(acsii).pipe(ws);
这样就完成了package.json
文件字母转小写后处理成 acsii 码的需求
为什么应该使用 stream
有个用户 Web 在线看视频的场景,假定我们通过 HTTP 请求返回给用户电影内容,那么代码可能写成这样
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.readFile(moviePath, (err, data) => { res.end(data); }); }).listen(8080);
这样的代码又两个明显的问题
- 电影文件需要读完之后才能返回给客户,
- 等待时间超长电影文件需要一次放入内存中,内存吃不消
使用 stream 可以把电影文件一点点的放入内存中,然后一点点的返回给客户
(利用了 HTTP 协议的Transfer-Encoding: chunked
分段传输特性),用户体验得到优化
,同时对内存的开销明显下降
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.createReadStream(moviePath).pipe(res); }).listen(8080);
除了上述好处,stream 还让代码优雅了很多,功能逻辑独立,拓展也比较简单。 比如需要对视频内容压缩,我们可以引入一个专门做此事的流, 这个流不用关心其它部分做了什么,只要是接入管道中就可以了
const http = require('http'); const fs = require('fs'); const oppressor = require(oppressor); http.createServer((req, res) => { fs.createReadStream(moviePath) .pipe(oppressor) .pipe(res); }).listen(8080);
可以看出来使用流后代码逻辑变得相对独立,可维护性也会有一定的改善