Implementation of custom streams
What are Custom Streams
In Node.js, streams are an efficient way to handle data, especially when dealing with large or unpredictable data volumes. Custom streams allow developers to create specialized data processing pipelines tailored to specific needs. Streams are divided into four basic types: Readable, Writable, Duplex, and Transform. By inheriting from these base classes and implementing specific methods, you can build fully customized stream implementations.
Implementing a Readable Stream
To create a custom readable stream, you need to inherit from the stream.Readable
class and implement the _read
method. This method is called when data is requested by the consumer, and data is fed into the stream using the push
method. When no more data is available, push null
to signal the end of the stream.
const { Readable } = require('stream');
class RandomNumberStream extends Readable {
constructor(options) {
super(options);
this.maxNumbers = 10;
this.currentNumber = 0;
}
_read(size) {
this.currentNumber += 1;
const randomNumber = Math.random();
if (this.currentNumber > this.maxNumbers) {
this.push(null); // End the stream
} else {
const buffer = Buffer.from(`${randomNumber}\n`, 'utf8');
this.push(buffer); // Push data
}
}
}
// Usage example
const randomStream = new RandomNumberStream();
randomStream.pipe(process.stdout);
This example creates a readable stream that generates 10 random numbers. The _read
method is called each time data is needed until the maximum count is reached. The pipe
method directs the stream data to standard output.
Implementing a Writable Stream
To create a custom writable stream, inherit from stream.Writable
and implement the _write
method. This method processes the incoming data chunks, performs the necessary operations, and then calls the callback function to signal completion.
const { Writable } = require('stream');
class FileWriter extends Writable {
constructor(filePath, options) {
super(options);
this.filePath = filePath;
this.chunks = [];
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
fs.appendFile(this.filePath, chunk, (err) => {
if (err) return callback(err);
callback();
});
}
_final(callback) {
console.log(`All data has been written to ${this.filePath}`);
callback();
}
}
// Usage example
const writer = new FileWriter('./output.txt');
writer.write('First line of data\n');
writer.write('Second line of data\n');
writer.end('Final line of data');
This writable stream appends received data to a specified file. The _write
method handles each data chunk, and _final
is called when the stream ends. Properly handling the callback is crucial for backpressure management.
Building a Duplex Stream
A duplex stream implements both readable and writable interfaces. To create one, inherit from stream.Duplex
and implement both _read
and _write
methods. This type of stream is suitable for scenarios requiring bidirectional communication.
const { Duplex } = require('stream');
class EchoStream extends Duplex {
constructor(options) {
super(options);
this.dataBuffer = [];
}
_write(chunk, encoding, callback) {
this.dataBuffer.push(chunk.toString());
callback();
}
_read(size) {
if (this.dataBuffer.length === 0) {
this.push(null);
} else {
this.push(this.dataBuffer.shift());
}
}
}
// Usage example
const echo = new EchoStream();
echo.on('data', (chunk) => {
console.log(`Received: ${chunk}`);
});
echo.write('Hello');
echo.write('World');
echo.end();
This duplex stream caches written data and returns it unchanged when read. Note that the read and write ends of a duplex stream are independent and not directly connected.
Creating a Transform Stream
A transform stream is a special type of duplex stream that inherits from stream.Transform
. It modifies or transforms the written data before outputting the result. Only the _transform
method needs to be implemented.
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const upperString = chunk.toString().toUpperCase();
this.push(upperString);
callback();
}
}
// Usage example
const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);
This transform stream converts all input text to uppercase. The _transform
method receives data chunks, processes them, and outputs the result via push
. Transform streams are ideal for data format conversion, encryption/decryption, and similar scenarios.
Advanced Stream Control
Node.js streams provide fine-grained control mechanisms. The highWaterMark
option sets the buffer size, affecting backpressure behavior. Custom streams can implement resource cleanup by overriding the _destroy
method.
const { Writable } = require('stream');
class ResourceHandler extends Writable {
constructor(options) {
super({ ...options, highWaterMark: 1024 * 1024 }); // 1MB buffer
this.resource = acquireResource();
}
_write(chunk, encoding, callback) {
processResource(this.resource, chunk, (err) => {
if (err) return callback(err);
callback();
});
}
_destroy(err, callback) {
releaseResource(this.resource, (releaseErr) => {
callback(releaseErr || err);
});
}
}
This example demonstrates how to manage external resources. highWaterMark
is set to 1MB, and _destroy
ensures resources are properly released. Proper error handling and resource cleanup are key to robust stream implementations.
Object Mode Streams
By default, streams handle Buffers or strings, but setting objectMode: true
allows them to handle arbitrary JavaScript objects. This is useful for processing complex data structures.
const { Readable } = require('stream');
class ObjectStream extends Readable {
constructor(options) {
super({ ...options, objectMode: true });
this.objects = [
{ id: 1, name: 'Object 1' },
{ id: 2, name: 'Object 2' },
{ id: 3, name: 'Object 3' }
];
}
_read(size) {
if (this.objects.length === 0) {
this.push(null);
} else {
this.push(this.objects.shift());
}
}
}
// Usage example
const objStream = new ObjectStream();
objStream.on('data', (obj) => {
console.log(`Received object: ${JSON.stringify(obj)}`);
});
Object mode streams can push entire objects without serialization. Note that object mode affects how highWaterMark
is counted, as it measures the number of objects rather than bytes.
Error Handling and Debugging
Robust stream implementations require comprehensive error handling. Errors can be caught using event listeners, and the _destroy
method can be implemented for cleanup.
const { Writable } = require('stream');
class SafeStream extends Writable {
constructor(options) {
super(options);
this.on('error', (err) => {
console.error('Stream error:', err);
this._cleanup();
});
}
_write(chunk, encoding, callback) {
try {
riskyOperation(chunk);
callback();
} catch (err) {
callback(err);
}
}
_cleanup() {
// Perform necessary cleanup
}
_destroy(err, callback) {
this._cleanup();
callback(err);
}
}
This pattern ensures resources are properly released even if an error occurs. When debugging streams, use stream.pipeline
instead of pipe
for better error handling.
Performance Optimization Tips
Efficient custom streams should consider memory usage and throughput. Techniques like using buffer pools, avoiding unnecessary copying, and setting highWaterMark
appropriately can improve performance.
const { Readable } = require('stream');
const bufferPool = require('bufferpool');
class HighPerfStream extends Readable {
constructor(options) {
super(options);
this.bufferSize = 1024 * 64; // 64KB
}
_read(size) {
bufferPool.alloc(this.bufferSize, (err, buffer) => {
if (err) return this.destroy(err);
// Fill the buffer
fillBuffer(buffer);
if (shouldContinue()) {
this.push(buffer);
} else {
bufferPool.free(buffer);
this.push(null);
}
});
}
}
This example shows how to reuse memory with a buffer pool. For high-frequency small data chunks, consider implementing the _writev
method to handle batch writes. Monitoring the stream's 'drain'
event can optimize write timing.
Real-World Application Examples
Custom streams have wide-ranging applications in real-world projects. For example, implementing a CSV parser:
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.remainder = '';
this.headers = null;
}
_transform(chunk, encoding, callback) {
const data = this.remainder + chunk.toString();
const lines = data.split('\n');
this.remainder = lines.pop(); // Save incomplete last line
if (!this.headers) {
this.headers = lines.shift().split(',');
}
for (const line of lines) {
const values = line.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i] ? values[i].trim() : null;
});
this.push(obj);
}
callback();
}
_flush(callback) {
if (this.remainder) {
const values = this.remainder.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i] ? values[i].trim() : null;
});
this.push(obj);
}
callback();
}
}
This CSV parser converts a text stream into an object stream, handling field mapping and automatic data splitting. The _flush
method ensures any remaining data is processed. This pattern is ideal for processing large data files.
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:流的背压问题
下一篇:Stream的高性能应用