The backpressure issue of streams
Backpressure Issues in Streams
Streams in Node.js are the core mechanism for handling large amounts of data, but when the data production speed exceeds the consumption speed, backpressure issues arise. If not addressed, this can lead to memory spikes or process crashes.
Principles of Backpressure Generation
When a Readable stream pushes data faster than a Writable stream can process it, unprocessed data accumulates in the buffer. Node.js streams have a default highWaterMark of 16KB. When this limit is exceeded:
const fs = require('fs');
// Quickly read a large file
const readStream = fs.createReadStream('huge.mov');
const writeStream = fs.createWriteStream('copy.mov');
// When the write speed cannot keep up with the read speed
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause(); // Key point: manual pause required
}
});
writeStream.on('drain', () => {
readStream.resume(); // Key point: manual resume required
});
Automatic Backpressure Handling
Modern Node.js's pipe()
method has built-in backpressure handling:
// Best practice: Use pipe with automatic backpressure control
fs.createReadStream('input.mp4')
.pipe(fs.createWriteStream('output.mp4'))
.on('error', console.error);
However, some scenarios require finer control:
let bytesWritten = 0;
const monitorStream = new PassThrough();
monitorStream.on('data', (chunk) => {
bytesWritten += chunk.length;
console.log(`Bytes written: ${bytesWritten} bytes`);
});
readStream
.pipe(monitorStream)
.pipe(writeStream);
Implementing Backpressure in Custom Streams
When creating custom Transform streams, backpressure must be explicitly handled:
const { Transform } = require('stream');
class ThrottleStream extends Transform {
constructor(ms) {
super();
this.delay = ms;
}
_transform(chunk, encoding, callback) {
this.push(chunk);
// Simulate processing delay with active throttling
setTimeout(callback, this.delay);
}
}
// Usage example
const throttle = new ThrottleStream(100);
fastReadStream
.pipe(throttle)
.pipe(slowWriteStream);
Special Handling for High-Concurrency Scenarios
When an HTTP server handles multiple upload requests simultaneously:
const http = require('http');
const server = http.createServer((req, res) => {
// Individual control for each request
let uploaded = 0;
req.on('data', (chunk) => {
uploaded += chunk.length;
if (uploaded > 100 * 1024 * 1024) {
req.pause(); // Pause receiving if exceeding 100MB
processAndClearBuffer(() => {
req.resume();
});
}
});
});
function processAndClearBuffer(done) {
// Simulate asynchronous processing
setTimeout(done, 1000);
}
Performance Monitoring and Debugging
Diagnose backpressure issues through event listeners:
const stream = fs.createReadStream('data.bin');
stream.on('data', (chunk) => {
console.log('Buffer size:', stream._readableState.length);
});
stream.on('end', () => {
console.log('Peak memory usage:', process.memoryUsage().rss);
});
// Or use third-party modules
const probe = require('stream-meter')();
stream.pipe(probe).pipe(writeStream);
Differences in Handling Various Stream Types
Object Mode Streams
const objectStream = new Transform({
objectMode: true,
transform(obj, _, cb) {
// Object streams have different highWaterMark counting
if (this._writableState.length > 100) {
process.nextTick(cb);
} else {
this.push(processObject(obj));
cb();
}
}
});
Duplex Streams
const { Duplex } = require('stream');
class EchoStream extends Duplex {
_write(chunk, _, callback) {
this.push(chunk); // Read while writing
callback();
}
_read() {
// Implement read logic
}
}
Common Pitfalls and Solutions
- Incorrect Example: Ignoring drain Events
// Anti-pattern: May cause memory leaks
readStream.on('data', (chunk) => {
writeStream.write(chunk); // No return value check
});
- Correct Approach:
function copyStream(source, target) {
return new Promise((resolve, reject) => {
source.on('data', onData);
target.on('drain', onDrain);
target.on('finish', resolve);
target.on('error', reject);
let isPaused = false;
function onData(chunk) {
const canContinue = target.write(chunk);
if (!canContinue && !isPaused) {
isPaused = true;
source.pause();
}
}
function onDrain() {
if (isPaused) {
isPaused = false;
source.resume();
}
}
});
}
Advanced Use Cases
Dynamic Rate Control
class DynamicThrottle extends Transform {
constructor() {
super({ highWaterMark: 1 }); // Strict buffer control
this.speed = 1;
}
_transform(chunk, _, cb) {
this.push(chunk);
// Adjust dynamically based on system load
const delay = 1000 / this.speed;
setTimeout(cb, delay);
}
adjustSpeed(newSpeed) {
this.speed = Math.max(1, newSpeed);
}
}
// Usage example
const throttle = new DynamicThrottle();
setInterval(() => {
const load = process.cpuUsage().user;
throttle.adjustSpeed(load > 50 ? 0.5 : 2);
}, 1000);
Multi-Stage Pipeline Control
const pipeline = util.promisify(stream.pipeline);
async function complexProcessing(input) {
try {
await pipeline(
input,
createDecryptStream(),
createDecompressStream(),
createValidationStream(),
output
);
} catch (err) {
console.error('Pipeline failed:', err);
}
}
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:管道机制(pipe)
下一篇:自定义流的实现