Error handling in Stream
Error Handling in Streams
In Node.js, Streams are the core abstraction for handling streaming data, whether it's file I/O, network communication, or data transformation. Since Streams process data in chunks and asynchronously, error handling becomes particularly important and complex.
Basic Mechanism of Stream Error Events
All Streams are instances of EventEmitter, and errors propagate through the error
event. Unhandled error
events cause the process to crash, which is Node.js's default behavior:
const fs = require('fs');
const readStream = fs.createReadStream('./nonexistent-file.txt');
// Uncaught error events will throw an exception
readStream.on('data', (chunk) => {
console.log(chunk);
});
The correct approach is to always listen for the error
event:
readStream.on('error', (err) => {
console.error('Error reading file:', err.message);
});
Error Handling in Readable Streams
Readable streams trigger the error
event in the following scenarios:
- File does not exist (ENOENT)
- Insufficient permissions (EACCES)
- Connection interruption during reading
const zlib = require('zlib');
const gzip = zlib.createGzip();
const source = fs.createReadStream('./large-file.log');
source
.on('error', (err) => {
console.error('Source file error:', err);
})
.pipe(gzip)
.on('error', (err) => {
console.error('Compression error:', err);
})
.pipe(fs.createWriteStream('./output.gz'))
.on('error', (err) => {
console.error('Write error:', err);
});
Error Handling in Writable Streams
Error handling in writable streams requires special attention to backpressure:
const writable = fs.createWriteStream('./output.txt');
writable.on('error', (err) => {
console.error('Write failed:', err);
// May need to clean up partially written files
});
writable.write('some data', (err) => {
// Each write operation's callback also receives errors
if (err) console.error('Write callback error:', err);
});
Error Propagation in Pipeline Chains
When using pipe()
, errors do not automatically propagate:
// Errors are not automatically passed through
src.pipe(transform).pipe(dest);
// Need to manually listen to each stream
src.on('error', handleError);
transform.on('error', handleError);
dest.on('error', handleError);
Node.js 10+ introduced stream.pipeline
to address this issue:
const { pipeline } = require('stream');
pipeline(
fs.createReadStream('./input.txt'),
zlib.createGzip(),
fs.createWriteStream('./output.gz'),
(err) => {
if (err) {
console.error('Pipeline operation failed:', err);
} else {
console.log('Pipeline operation succeeded');
}
}
);
Error Handling in Transform Streams
Transform streams need to handle both read and write errors:
const { Transform } = require('stream');
class MyTransform extends Transform {
_transform(chunk, encoding, callback) {
try {
// Processing logic
const result = processChunk(chunk);
callback(null, result);
} catch (err) {
// Synchronous errors can be passed via callback
callback(err);
}
}
_flush(callback) {
someAsyncOperation((err, data) => {
if (err) return callback(err);
callback(null, data);
});
}
}
Error Recovery Strategies
Depending on the scenario, different recovery mechanisms may be needed:
- Retry Logic:
function createRetryableReadStream(filename, retries = 3) {
let attempts = 0;
function createStream() {
const stream = fs.createReadStream(filename);
stream.on('error', (err) => {
if (attempts++ < retries) {
setTimeout(() => createStream(), 100 * attempts);
} else {
stream.emit('giveup', err);
}
});
return stream;
}
return createStream();
}
- Fallback Data Source:
const primary = fs.createReadStream('./primary-data.json');
const fallback = fs.createReadStream('./fallback-data.json');
primary.on('error', (err) => {
console.error('Primary data source failed, using fallback:', err);
primary.unpipe();
fallback.pipe(consumer);
});
Advanced Error Handling Patterns
For complex scenarios, consider the following patterns:
- Error Aggregation:
class ErrorAggregator extends Writable {
constructor(options) {
super(options);
this.errors = [];
}
_write(chunk, encoding, callback) {
someOperation(chunk, (err) => {
if (err) {
this.errors.push(err);
// Continue processing next chunk
return callback();
}
callback();
});
}
_final(callback) {
if (this.errors.length > 0) {
return callback(new AggregateError(this.errors));
}
callback();
}
}
- Stateful Error Handling:
function createStatefulStream() {
let errorState = null;
return new Transform({
transform(chunk, encoding, callback) {
if (errorState) {
// Skip processing or execute recovery logic
return callback();
}
try {
// Normal processing
callback(null, transform(chunk));
} catch (err) {
errorState = err;
this.emit('error', err);
callback();
}
}
});
}
Interoperability with Promises
Modern Node.js supports async/await with Streams:
async function processStream(stream) {
try {
for await (const chunk of stream) {
await processChunk(chunk);
}
} catch (err) {
console.error('Async iteration error:', err);
// May need to clean up resources
}
}
Performance Considerations
Error handling affects Stream performance in several ways:
- Excessive
try/catch
blocks reduce throughput - Error recovery logic may increase memory usage
- Complex error propagation paths add CPU overhead
// Inefficient error checking
transform.on('data', (chunk) => {
try {
process(chunk);
} catch (err) {
transform.emit('error', err);
}
});
// More efficient approach
transform.on('error', (err) => {
// Centralized handling
});
transform.on('data', (chunk) => {
const result = process(chunk); // Assume process may throw
// ...
});
Debugging Tips
When debugging Stream errors, consider these tools:
- NODE_DEBUG Environment Variable:
NODE_DEBUG=stream node app.js
- Custom Debug Stream:
class DebugStream extends Transform {
_transform(chunk, encoding, callback) {
console.debug('Chunk:', chunk.length, 'bytes');
this.push(chunk);
callback();
}
_flush(callback) {
console.debug('Stream ended');
callback();
}
}
- Error Tracing:
stream.on('error', (err) => {
console.error('Error location:', new Error().stack);
console.error('Original error:', err.stack);
});
Common Pitfalls
- Error Handling Order Issues:
stream
.pipe(transform)
.on('error', (err) => console.error(err)) // May register too late
.pipe(output);
// Better approach
stream.on('error', (err) => console.error('Input error:', err));
transform.on('error', (err) => console.error('Transform error:', err));
output.on('error', (err) => console.error('Output error:', err));
- Uncleaned Resources:
stream.on('error', (err) => {
console.error(err);
// Forgot to close file descriptors or other resources
});
// Correct approach
let fd;
fs.open('file.txt', 'r', (err, fileDescriptor) => {
if (err) throw err;
fd = fileDescriptor;
const stream = fs.createReadStream(null, { fd });
stream.on('error', (err) => {
fs.close(fd, () => {});
console.error(err);
});
});
Testing Strategies
Testing Stream error handling requires special consideration:
const { PassThrough } = require('stream');
test('should handle transform errors', (done) => {
const input = new PassThrough();
const transform = new MyTransformStream();
transform.on('error', (err) => {
expect(err.message).toBe('Expected error');
done();
});
input.pipe(transform);
input.write('Data that triggers error');
});
Simulating error scenarios:
function createErroringStream() {
const stream = new Readable({
read() {
this.emit('error', new Error('Simulated error'));
}
});
return stream;
}
Integration with Third-Party Libraries
When handling Streams from third-party libraries:
- Libraries may use custom errors:
const libStream = require('some-lib').createStream();
libStream.on('error', (err) => {
if (err instanceof SomeLib.CustomError) {
// Special handling
} else {
// Generic handling
}
});
- Error code checking:
mysqlConnection.query().stream()
.on('error', (err) => {
if (err.code === 'ER_QUERY_INTERRUPTED') {
// Handle query interruption
}
});
Browser Environment Differences
Stream APIs in browsers (e.g., Fetch's body) handle errors differently:
fetch('/api/stream')
.then(response => {
const reader = response.body.getReader();
function read() {
return reader.read().then(({ done, value }) => {
if (done) return;
// Process data
return read();
});
}
return read();
})
.catch(err => {
console.error('Stream read failed:', err);
});
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:Stream的高性能应用
下一篇:常见Stream应用场景