44 lines
1.1 KiB
JavaScript
44 lines
1.1 KiB
JavaScript
var util = require('util'),
|
|
TransformStream = require('stream').Transform;
|
|
|
|
module.exports = function (options) {
|
|
return new JSONStream(options);
|
|
};
|
|
|
|
var JSONStream = module.exports.JSONStream = function (options) {
|
|
options = options || {};
|
|
TransformStream.call(this, options);
|
|
this._writableState.objectMode = false;
|
|
this._readableState.objectMode = true;
|
|
this._async = options.async || false;
|
|
};
|
|
util.inherits(JSONStream, TransformStream);
|
|
|
|
JSONStream.prototype._transform = function (data, encoding, callback) {
|
|
if (!Buffer.isBuffer(data)) data = new Buffer(data);
|
|
if (this._buffer) {
|
|
data = Buffer.concat([this._buffer, data]);
|
|
}
|
|
|
|
var ptr = 0, start = 0;
|
|
while (++ptr <= data.length) {
|
|
if (data[ptr] === 10 || ptr === data.length) {
|
|
var line;
|
|
try {
|
|
line = JSON.parse(data.slice(start, ptr));
|
|
}
|
|
catch (ex) { }
|
|
if (line) {
|
|
this.push(line);
|
|
line = null;
|
|
}
|
|
if (data[ptr] === 10) start = ++ptr;
|
|
}
|
|
}
|
|
|
|
this._buffer = data.slice(start);
|
|
return this._async
|
|
? void setImmediate(callback)
|
|
: void callback();
|
|
};
|