The basic concept of Stream
Basic Concepts of Streams
Streams are abstract interfaces in Node.js for handling streaming data, allowing data to be processed in chunks without loading it entirely into memory. This mechanism is particularly suitable for scenarios involving large files, network communication, or real-time data. The Stream module in Node.js provides four basic types: Readable, Writable, Duplex, and Transform.
Core Advantages of Streams
Traditional file processing requires reading the entire file into memory, whereas streams can split data into smaller chunks for gradual processing. For example, when processing a 10GB log file:
// Traditional approach (risk of memory explosion)
fs.readFile('huge.log', (err, data) => {
// The entire file content is stored in the `data` variable
});
// Stream processing (memory-friendly)
const stream = fs.createReadStream('huge.log');
stream.on('data', (chunk) => {
// Only a small chunk of data is processed at a time
});
The core features of streams include backpressure control, automatic memory management, and pipeline connectivity. When data processing cannot keep up with data production, the stream automatically pauses the source data production—a mechanism known as backpressure.
Detailed Explanation of Readable Streams
Readable streams represent data sources, such as file read streams or HTTP request bodies. There are two reading modes:
// Flowing mode (automatic pushing)
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
// Paused mode (manual pulling)
readableStream.on('readable', () => {
let chunk;
while ((chunk = readableStream.read()) !== null) {
console.log(`Read ${chunk.length} bytes`);
}
});
Example of a custom Readable stream:
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(limit) {
super();
this.limit = limit;
this.count = 0;
}
_read() {
if (this.count++ < this.limit) {
this.push(this.count.toString());
} else {
this.push(null); // End the stream
}
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout); // Output: 12345
Detailed Explanation of Writable Streams
Writable streams represent data destinations, such as file write streams or HTTP responses. Key methods include write()
and end()
:
const { Writable } = require('stream');
class LoggerStream extends Writable {
_write(chunk, encoding, callback) {
console.log(`LOG: ${chunk.toString()}`);
callback(); // Notify completion of processing
}
}
const logger = new LoggerStream();
process.stdin.pipe(logger); // Log console input
Error handling in practical applications:
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
writeStream.on('error', (err) => {
console.error('Write failed:', err);
});
writeStream.write('First piece of data\n');
writeStream.end('Final data');
Duplex and Transform Streams
Duplex streams implement both Readable and Writable interfaces, with TCP sockets being a typical example. Transform streams are special Duplex streams used for data transformation:
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);
A more complex example of an encryption stream:
const crypto = require('crypto');
const { pipeline } = require('stream');
const encryptStream = crypto.createCipheriv('aes-256-cbc', key, iv);
const decryptStream = crypto.createDecipheriv('aes-256-cbc', key, iv);
pipeline(
fs.createReadStream('secret.data'),
encryptStream,
fs.createWriteStream('encrypted.data'),
(err) => {
if (err) console.error('Encryption failed', err);
}
);
Advanced Usage of Streams
Node.js provides powerful pipeline mechanisms. Traditional pipe()
method:
sourceStream
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(destinationStream);
ES module's asynchronous pipeline:
import { pipeline } from 'stream/promises';
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.gz')
);
Merging multiple streams:
const { PassThrough } = require('stream');
const mergeStreams = (...streams) => {
const passThrough = new PassThrough();
streams.forEach(stream => stream.pipe(passThrough, { end: false }));
return passThrough;
};
Performance Optimization for Streams
Adjust highWaterMark
to control buffer size:
// Default 16KB, adjusted to 1MB
const bigStream = fs.createReadStream('bigfile', {
highWaterMark: 1024 * 1024
});
Stream processing in object mode:
const { Readable } = require('stream');
const objectStream = new Readable({
objectMode: true,
read() {
this.push({ timestamp: Date.now() });
// Can push any JavaScript object
}
});
Practical Application Scenarios
Streaming responses in HTTP servers:
const http = require('http');
http.createServer((req, res) => {
const videoStream = fs.createReadStream('movie.mp4');
res.setHeader('Content-Type', 'video/mp4');
videoStream.pipe(res); // Avoid buffering the entire video file
}).listen(3000);
Streaming database query results:
const { MongoClient } = require('mongodb');
async function streamLargeCollection() {
const client = await MongoClient.connect(uri);
const cursor = client.db().collection('huge').find();
cursor.stream().on('data', (doc) => {
// Process documents one by one
}).on('end', () => {
client.close();
});
}
Error Handling Patterns for Streams
Robust stream processing should include comprehensive error handling:
const handleStream = async () => {
try {
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
readStream.on('error', cleanup);
writeStream.on('error', cleanup);
await pipeline(
readStream,
new TransformStream({/*...*/}),
writeStream
);
} catch (err) {
console.error('Pipeline processing failed:', err);
}
};
function cleanup(err) {
// Close all resources
}
Streams and Async Iterators
Modern Node.js supports async iterators for stream operations:
async function processStream() {
const readable = fs.createReadStream('data.txt', { encoding: 'utf8' });
for await (const chunk of readable) {
console.log('Processing chunk:', chunk.length);
}
}
Custom iterable stream:
class IterableStream extends Readable {
constructor(iterator) {
super({ objectMode: true });
this.iterator = iterator;
}
async _read() {
try {
const { value, done } = await this.iterator.next();
if (done) this.push(null);
else this.push(value);
} catch (err) {
this.destroy(err);
}
}
}
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:附赠一行代码笑十年的摸鱼宝典🐟
下一篇:四种基本流类型