Show More
Commit Description:
merge
Commit Description:
merge
References:
File last commit:
Show/Diff file:
Action:
node_modules/through/index.js
| 108 lines
| 2.6 KiB
| application/javascript
| JavascriptLexer
|
r789 | var Stream = require('stream') | |||
// through | ||||
// | ||||
// a stream that does nothing but re-emit the input. | ||||
// useful for aggregating a series of changing but not ending streams into one stream) | ||||
exports = module.exports = through | ||||
through.through = through | ||||
//create a readable writable stream. | ||||
function through (write, end, opts) { | ||||
write = write || function (data) { this.queue(data) } | ||||
end = end || function () { this.queue(null) } | ||||
var ended = false, destroyed = false, buffer = [], _ended = false | ||||
var stream = new Stream() | ||||
stream.readable = stream.writable = true | ||||
stream.paused = false | ||||
// stream.autoPause = !(opts && opts.autoPause === false) | ||||
stream.autoDestroy = !(opts && opts.autoDestroy === false) | ||||
stream.write = function (data) { | ||||
write.call(this, data) | ||||
return !stream.paused | ||||
} | ||||
function drain() { | ||||
while(buffer.length && !stream.paused) { | ||||
var data = buffer.shift() | ||||
if(null === data) | ||||
return stream.emit('end') | ||||
else | ||||
stream.emit('data', data) | ||||
} | ||||
} | ||||
stream.queue = stream.push = function (data) { | ||||
// console.error(ended) | ||||
if(_ended) return stream | ||||
if(data === null) _ended = true | ||||
buffer.push(data) | ||||
drain() | ||||
return stream | ||||
} | ||||
//this will be registered as the first 'end' listener | ||||
//must call destroy next tick, to make sure we're after any | ||||
//stream piped from here. | ||||
//this is only a problem if end is not emitted synchronously. | ||||
//a nicer way to do this is to make sure this is the last listener for 'end' | ||||
stream.on('end', function () { | ||||
stream.readable = false | ||||
if(!stream.writable && stream.autoDestroy) | ||||
process.nextTick(function () { | ||||
stream.destroy() | ||||
}) | ||||
}) | ||||
function _end () { | ||||
stream.writable = false | ||||
end.call(stream) | ||||
if(!stream.readable && stream.autoDestroy) | ||||
stream.destroy() | ||||
} | ||||
stream.end = function (data) { | ||||
if(ended) return | ||||
ended = true | ||||
if(arguments.length) stream.write(data) | ||||
_end() // will emit or queue | ||||
return stream | ||||
} | ||||
stream.destroy = function () { | ||||
if(destroyed) return | ||||
destroyed = true | ||||
ended = true | ||||
buffer.length = 0 | ||||
stream.writable = stream.readable = false | ||||
stream.emit('close') | ||||
return stream | ||||
} | ||||
stream.pause = function () { | ||||
if(stream.paused) return | ||||
stream.paused = true | ||||
return stream | ||||
} | ||||
stream.resume = function () { | ||||
if(stream.paused) { | ||||
stream.paused = false | ||||
stream.emit('resume') | ||||
} | ||||
drain() | ||||
//may have become paused again, | ||||
//as drain emits 'data'. | ||||
if(!stream.paused) | ||||
stream.emit('drain') | ||||
return stream | ||||
} | ||||
return stream | ||||
} | ||||