深入浅析Node中的Stream「流」

1.什么是Node中的Stream?

在Node.js中,Stream(流)是一种处理流数据的抽象接口。它是Node.js提供的一种处理I/O操作的机制。流提供了一种可读、可写、可读写的抽象,而Node中的许多对象都实现了这个接口,例如http请求、处理善文和处理登录等。与字节流一样,Stream可以是可写的、可读的或者既可读也可写的。

const stream = require('stream');

const rs = new stream.Readable();

rs.push('beijin');

rs.push('shanghai');

rs.push('shenzhen');

rs.push(null);

rs.pipe(process.stdout);

这段代码中,我们通过stream模块创建了一个可读流Readable,并使用了push()方法向流中添加了3个字符串,最后用了null来告诉可读流已经全部写入完毕。然后将可读流传给可写流(stdout),最终所有数据都输出到控制台。

2.流的分类

2.1 可读流 Readable

Node中,Readable是实现了该接口的数据源。它可以从文件、网络等地方读取数据,然后以非阻塞的方式提供给消费者处理。Readable流的模式是:从数据源读取数据——> push数据到流中——> 等待消费者consume(使用数据)——> 再继续读取——> 再push数据到流中——> 再等待消费者consume。

const fs = require('fs');

const rs = fs.createReadStream('stream.txt');

rs.on('data', function (chunk) {

console.log(chunk);

});

rs.on('end', function () {

console.log('数据读取完毕');

});

这段代码中,我们使用fs模块的createReadStream方法创建可读流,并通过on data事件监听每次接收数据的回调函数。在读取完毕后,使用on end事件监听结束的回调函数。

2.2 可写流 Writable

Writable流是一种从程序中写入数据到某个目的地(文件、网络等等)的抽象接口。

const fs = require('fs');

const ws = fs.createWriteStream('output.txt');

ws.write('Hello, ');

ws.write('world!');

ws.end();

这段代码中,我们使用fs模块的createWriteStream方法创建可写流,并通过write方法向可写流中写入数据。通过end方法代表写入数据已经全部完成。

2.3 双工流Duplex

Duplex即双工流,既可以读又可以写。可以看做可读流与可写流的结合,数据可以从中读取也可以写入其中。

const stream = require('stream');

const duplexStream = new stream.Duplex();

duplexStream.push('hello,');

duplexStream.push('world!');

duplexStream.push(null);

duplexStream.pipe(process.stdout);

process.stdin.pipe(duplexStream);

这段代码中,我们通过stream模块创建了一个双工流duplexStream,并使用了push()方法向流中添加了两个字符串,最后用了null来告诉流已经全部写入完毕。然后用pipe将读入的标准输入传给这个双工流中,最终数据被输送到了控制台输出。

3.Stream的工作原理

Stream的最重要的应用场景在于处理大量的数据,例如:500M 的数据需要往数据库里头写入,如果你在同步环境下尝试写入,它会占用很长时间,甚至导致内存溢出。因此,Stream是按照固定大小分成块的方式处理这些数据。也就是分批次、逐块读取、传输、写入数据。这样可以降低内存的占用,提高效率。

Stream是基于事件机制实现的,当数据准备好了,Stream会主动触发一个数据事件,将数据传递给监听器,这样就可以对数据流进行处理。

Stream的读写数据模式也分为两种模式:push模式和pull模式。

push模式是指数据源、流和去向之间的流动由产生数据的对象控制,流的消费者在通过调用事件处理函数读取数据时,才会触发数据源来产生数据输出。

const stream = require('stream');

const rs = new stream.Readable({highWaterMark: 50});

let count = 0;

rs._read = function () {

rs.push(count.toString() + ' ');

if (count++ === 9) {

rs.push(null);

}

};

rs.pipe(process.stdout);

这段代码中,我们使用了push方式向Readable流中添加了10个数字字符串,每个数字字符串使用空格隔开,使用push(null)表示输入数据流已经全部生成。最后通过pipe方法输出到标准输出中。

pull模式正好相反,是消费者通过主动事件调用来拉取数据的模式。一般消费者通过一次次调用read()方法来拉取数据。

const stream = require('stream');

const rs = new stream.Readable({highWaterMark: 50});

let count = 0;

rs._read = function (size) {

rs.push(count.toString());

if (count++ === 9) {

rs.push(null);

}

};

function read() {

let buffer;

while ((buffer = rs.read(2)) != null) {

console.log(`读取数据:${buffer}`);

}

}

rs.on('readable', read);

在这段代码中,我们也是用了push模式向流中推入10个数字字符串,并且在转化时,使用了pull模式一次次通过读方法read来拖取数据,每次拖2个字符,直到拖取完毕。

4.Stream的事件

从数据源到目的地,经过大量流程和处理过程,可能会出现很多错误和异常情况,Stream的事件是为了提供这样的数据处理情况。

4.1. 可读流的事件

data:当有数据可读时触发

end:当没有更多的数据可读时触发

error:在接收数据过程中发生错误时触发

readable:当新的数据块可以从流中读取时触发

close:当流或底层资源已经关闭时触发

4.2. 可写流的事件

drain:当写入流返回“数据已经被消耗“之后触发

error:在写入数据过程中发生错误时触发

finish:当所有数据都已被写入流时触发

pipe:当可读流被管道连接到可写流时触发

unpipe:当可读流没有管道结尾时触发

close:当流或底层资源已经关闭时触发

4.3. 双工流Duplex的事件

Duplex继承了Readable和Writable的事件,所以Duplex流有更多的事件,具体如下

readable:当有数据可读时触发

data:当有可用数据时触发

end:当没有更多的数据可读时触发

error:在接收数据过程中发生错误时触发

drain:当写入流返回“数据已经被消耗”之后触发

finish:当所有数据都已被写入流时触发

pipe:当可读流被管道连接到可写流时触发

unpipe:当可读流没有管道结尾时触发

close:当流或底层资源已经关闭时触发

5.使用场景

Stream的优点在于能够高效处理大量数据,所以在实际开发中,Stream被广泛应用于以下场景:

文件读取和写入:Stream可以从磁盘中高效的读取文件并写入到管道中。

网络:经常使用Stream来处理基于网络的连接

文件压缩和加密:使用Stream从输入数据中读取数据,对其进行加密或压缩,然后将其写入到输出数据中

多媒体处理:视频或音频的编码和解码就可以使用Stream的格式,例如在Node中,FFmpeg处理音频或视频时,使用ChildProcess对象通过spawn方法来启用一个Spawned子进程运行命令,并使用两个Duplex流来连接父进程和子进程:

const ffmpeg = require('child_process').spawn('ffmpeg', [

'-i', 'video.avi',

'-^', 'video.mmp4'

]);

const fs = require('fs');

const videoData = fs.createReadStream('video.avi');

const processStdin = process.stdin;

videoData.pipe(ffmpeg.stdin);

ffmpeg.stdout.pipe(process.stdout);

6.总结

Node.js中的Stream是一种数据的读写接口,通过它能够实现高效的读写大容量数据,特别适用于一些数据读写比较耗时的场景。整个Stream的流程是被事件机制所驱动的,读写模式主要有push和pull模式,具体使用看项目和需求的不同而定,需要注意其事件的监听和使用场景的真正需要。