1.什么是Node中的Stream?
在Node.js中,Stream(流)是一种处理流数据的抽象接口。它是Node.js提供的一种处理I/O操作的机制。流提供了一种可读、可写、可读写的抽象,而Node中的许多对象都实现了这个接口,例如http请求、处理善文和处理登录等。与字节流一样,Stream可以是可写的、可读的或者既可读也可写的。
const stream = require('stream');
const rs = new stream.Readable();
rs.push('beijin');
rs.push('shanghai');
rs.push('shenzhen');
rs.push(null);
rs.pipe(process.stdout);
这段代码中,我们通过stream模块创建了一个可读流Readable,并使用了push()方法向流中添加了3个字符串,最后用了null来告诉可读流已经全部写入完毕。然后将可读流传给可写流(stdout),最终所有数据都输出到控制台。
2.流的分类
2.1 可读流 Readable
Node中,Readable是实现了该接口的数据源。它可以从文件、网络等地方读取数据,然后以非阻塞的方式提供给消费者处理。Readable流的模式是:从数据源读取数据——> push数据到流中——> 等待消费者consume(使用数据)——> 再继续读取——> 再push数据到流中——> 再等待消费者consume。
const fs = require('fs');
const rs = fs.createReadStream('stream.txt');
rs.on('data', function (chunk) {
console.log(chunk);
});
rs.on('end', function () {
console.log('数据读取完毕');
});
这段代码中,我们使用fs模块的createReadStream方法创建可读流,并通过on data事件监听每次接收数据的回调函数。在读取完毕后,使用on end事件监听结束的回调函数。
2.2 可写流 Writable
Writable流是一种从程序中写入数据到某个目的地(文件、网络等等)的抽象接口。
const fs = require('fs');
const ws = fs.createWriteStream('output.txt');
ws.write('Hello, ');
ws.write('world!');
ws.end();
这段代码中,我们使用fs模块的createWriteStream方法创建可写流,并通过write方法向可写流中写入数据。通过end方法代表写入数据已经全部完成。
2.3 双工流Duplex
Duplex即双工流,既可以读又可以写。可以看做可读流与可写流的结合,数据可以从中读取也可以写入其中。
const stream = require('stream');
const duplexStream = new stream.Duplex();
duplexStream.push('hello,');
duplexStream.push('world!');
duplexStream.push(null);
duplexStream.pipe(process.stdout);
process.stdin.pipe(duplexStream);
这段代码中,我们通过stream模块创建了一个双工流duplexStream,并使用了push()方法向流中添加了两个字符串,最后用了null来告诉流已经全部写入完毕。然后用pipe将读入的标准输入传给这个双工流中,最终数据被输送到了控制台输出。
3.Stream的工作原理
Stream的最重要的应用场景在于处理大量的数据,例如:500M 的数据需要往数据库里头写入,如果你在同步环境下尝试写入,它会占用很长时间,甚至导致内存溢出。因此,Stream是按照固定大小分成块的方式处理这些数据。也就是分批次、逐块读取、传输、写入数据。这样可以降低内存的占用,提高效率。
Stream是基于事件机制实现的,当数据准备好了,Stream会主动触发一个数据事件,将数据传递给监听器,这样就可以对数据流进行处理。
Stream的读写数据模式也分为两种模式:push模式和pull模式。
push模式是指数据源、流和去向之间的流动由产生数据的对象控制,流的消费者在通过调用事件处理函数读取数据时,才会触发数据源来产生数据输出。
const stream = require('stream');
const rs = new stream.Readable({highWaterMark: 50});
let count = 0;
rs._read = function () {
rs.push(count.toString() + ' ');
if (count++ === 9) {
rs.push(null);
}
};
rs.pipe(process.stdout);
这段代码中,我们使用了push方式向Readable流中添加了10个数字字符串,每个数字字符串使用空格隔开,使用push(null)表示输入数据流已经全部生成。最后通过pipe方法输出到标准输出中。
pull模式正好相反,是消费者通过主动事件调用来拉取数据的模式。一般消费者通过一次次调用read()方法来拉取数据。
const stream = require('stream');
const rs = new stream.Readable({highWaterMark: 50});
let count = 0;
rs._read = function (size) {
rs.push(count.toString());
if (count++ === 9) {
rs.push(null);
}
};
function read() {
let buffer;
while ((buffer = rs.read(2)) != null) {
console.log(`读取数据:${buffer}`);
}
}
rs.on('readable', read);
在这段代码中,我们也是用了push模式向流中推入10个数字字符串,并且在转化时,使用了pull模式一次次通过读方法read来拖取数据,每次拖2个字符,直到拖取完毕。
4.Stream的事件
从数据源到目的地,经过大量流程和处理过程,可能会出现很多错误和异常情况,Stream的事件是为了提供这样的数据处理情况。
4.1. 可读流的事件
data:当有数据可读时触发
end:当没有更多的数据可读时触发
error:在接收数据过程中发生错误时触发
readable:当新的数据块可以从流中读取时触发
close:当流或底层资源已经关闭时触发
4.2. 可写流的事件
drain:当写入流返回“数据已经被消耗“之后触发
error:在写入数据过程中发生错误时触发
finish:当所有数据都已被写入流时触发
pipe:当可读流被管道连接到可写流时触发
unpipe:当可读流没有管道结尾时触发
close:当流或底层资源已经关闭时触发
4.3. 双工流Duplex的事件
Duplex继承了Readable和Writable的事件,所以Duplex流有更多的事件,具体如下
readable:当有数据可读时触发
data:当有可用数据时触发
end:当没有更多的数据可读时触发
error:在接收数据过程中发生错误时触发
drain:当写入流返回“数据已经被消耗”之后触发
finish:当所有数据都已被写入流时触发
pipe:当可读流被管道连接到可写流时触发
unpipe:当可读流没有管道结尾时触发
close:当流或底层资源已经关闭时触发
5.使用场景
Stream的优点在于能够高效处理大量数据,所以在实际开发中,Stream被广泛应用于以下场景:
文件读取和写入:Stream可以从磁盘中高效的读取文件并写入到管道中。
网络:经常使用Stream来处理基于网络的连接
文件压缩和加密:使用Stream从输入数据中读取数据,对其进行加密或压缩,然后将其写入到输出数据中
多媒体处理:视频或音频的编码和解码就可以使用Stream的格式,例如在Node中,FFmpeg处理音频或视频时,使用ChildProcess对象通过spawn方法来启用一个Spawned子进程运行命令,并使用两个Duplex流来连接父进程和子进程:
const ffmpeg = require('child_process').spawn('ffmpeg', [
'-i', 'video.avi',
'-^', 'video.mmp4'
]);
const fs = require('fs');
const videoData = fs.createReadStream('video.avi');
const processStdin = process.stdin;
videoData.pipe(ffmpeg.stdin);
ffmpeg.stdout.pipe(process.stdout);
6.总结
Node.js中的Stream是一种数据的读写接口,通过它能够实现高效的读写大容量数据,特别适用于一些数据读写比较耗时的场景。整个Stream的流程是被事件机制所驱动的,读写模式主要有push和pull模式,具体使用看项目和需求的不同而定,需要注意其事件的监听和使用场景的真正需要。