NodeJS中的Stream – 浅入篇

Nodejs cyanprobe 8年前 (2016-10-15) 4276次浏览 已收录 3个评论

前言:

什么?砖升本?不行,戒毒后只想学习代码。跟随Geemo狗的节奏,学习下他的stream深入篇,好的直接略过博文看最后的参考资料 :grin: ,美团前端的文章,那么我就跟随着来上那么一发。
美团原文: http://fe.meituan.com/stream-internals.html

流的应用:

先来一发,我写了个复制java学习资料的栗子。代码如下。(知乎上有参考),clearLine,cursorTo等方法已经从process.stdout移到了Readline那里,具体可以看极客的API或者英文API。

 "use strict"
const readline = require('readline');
const fs = require('fs');
const filePath = 'snis716.avi';
const reFilePath = 'javlibrary.avi';
let passedLength = 0;
let lastSize = 0;
let readStream = fs.createReadStream(filePath);
let writeStream = fs.createWriteStream(reFilePath);
//返回stat数组
let stat=fs.statSync(filePath);
const totalSize=stat.size;
console.log(stat);
readStream.on('data',chunk =>{
 passedLength+= chunk.length;
 if(writeStream.write(chunk) === false ){
 readStream.pause();
 }
})
readStream.on('end', ()=> {
 writeStream.end();
})
//drain事件:可以向流中写入更多数据
writeStream.on('drain', ()=> {
 readStream.resume();//继续触发data
})
console.info(`正在复制${filePath}到${reFilePath}`);
console.time('copyTime');
setTimeout(function show() {
let present = Math.ceil((passedLength/totalSize)*100);
let size = Math.ceil((passedLength/1048576));
let currentSize = size - lastSize;
 lastSize = size;
//清除本行
 readline.clearLine(process.stdout, 0)
//光标移向本行初始位置
 readline.cursorTo(process.stdout, 0, null)
process.stdout.write(`正在复制,写入速率${currentSize*2}MB/s,已写入${size}MB`);
if (passedLength < totalSize) {
 setTimeout(show, 500);
 } else {
 console.log('\n');
 console.timeEnd('copyTime');
}},500)

 
stream
这段代码就复制了G级文件,当然也可以用pipe 实现自动控制。
fs.createReadStream(‘snis716.avi’).pipe(fs.createWriteStream(‘javlibrary.avi’));

Stream 潜入

先上API  http://nodeapi.ucdok.com/#/api/stream.html

var Stream = require(‘stream’);
var Readable = Stream.Readable;//可读流
var Writable = Stream.Writable;//可写流
var Duplex = Stream.Duplex;//可读可写流
var Transform = Stream.Transform;//因果关系的双工流

Readable:

readable
下游请求数据,会调用_read()方法,这东西一方面从数据源获取数据,一方面Push并触发data事件。

Read实现:

stream-read

 //Readable局部源码
var doRead = state.needReadable;
 // if we currently have less than the highWaterMark, then also read some
 if (state.length === 0 || state.length - n < state.highWaterMark) {
 doRead = true;
 }
 if (state.ended || state.reading) {
 doRead = false;
 debug('reading or ended', doRead);
 }
 if (doRead) {
 debug('do read');
 state.reading = true;
 state.sync = true;
 if (state.length === 0)
 state.needReadable = true;
 // call internal read method
 this._read(state.highWaterMark);
 state.sync = false;
 }
 // 如果_read是同步的那么reading为false
 // 从缓存获取的实际数据量
 if (doRead && !state.reading){
    n = howMuchToRead(nOrig, state);
    state.sync = false
 }

state.highWaterMark  缓存阈值
state.length   当前缓存数据量
state.flowing  流动模式
sync   push方法的同/异步
state.needReadable 缓存是否充足,不充足从底层来数据
reading 上一次从底层获取数据是否结束

后记

今天到这了,记录下,后面的内容看Geemo和美团了,懒得码字了,还是这是别人理解出来的东西,接下来就要自己看源码,结合下别人的理解,去“看懂”源码了。有时候了解个原理过程很简单,但是能设计并应用上去却很难,剩下的看源码了,我这就不重复搬弄了。
 
 


CyanProbe , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:NodeJS中的Stream – 浅入篇
喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(3)个小伙伴在吐槽
  1. 来看看,学习学习!!
    欧娜2016-10-17 08:20 回复
  2. java图书馆管理员,你好
    2016-10-20 13:19 回复