Common Stream application scenarios
File Read/Write Operations
Streams are highly practical for file handling, especially with large files. Traditional fs.readFile
loads the entire file into memory at once, whereas using Streams allows for chunk-by-chunk processing.
const fs = require('fs');
// Read a large file
const readStream = fs.createReadStream('./large-file.txt', {
highWaterMark: 64 * 1024 // Read 64KB at a time
});
// Write to a new file
const writeStream = fs.createWriteStream('./copy-file.txt');
readStream.pipe(writeStream);
readStream.on('end', () => {
console.log('File copy completed');
});
HTTP Requests and Responses
Node.js's HTTP module is inherently based on Streams, making it particularly useful for handling large HTTP request bodies:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// Stream the response
const fileStream = fs.createReadStream('./large-data.json');
fileStream.pipe(res);
// Stream the request body
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
console.log('Request body received');
});
}).listen(3000);
Data Transformation Processing
By connecting multiple transform streams via pipelines, complex data processing workflows can be achieved:
const { Transform } = require('stream');
// Create a transform stream
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Use the transform stream
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
Database Operations
When handling large database records, Streams can prevent memory overflow:
const { MongoClient } = require('mongodb');
async function streamLargeData() {
const client = await MongoClient.connect('mongodb://localhost:27017');
const collection = client.db('test').collection('largeCollection');
const stream = collection.find().stream();
stream.on('data', (doc) => {
console.log('Processing document:', doc._id);
});
stream.on('end', () => {
client.close();
});
}
Real-time Log Processing
Streams are ideal for handling continuously generated log data:
const fs = require('fs');
const readline = require('readline');
const logStream = fs.createReadStream('./app.log');
const rl = readline.createInterface({
input: logStream,
crlfDelay: Infinity
});
rl.on('line', (line) => {
if (line.includes('ERROR')) {
console.log('Found error log:', line);
}
});
Video Stream Processing
Streams are essential for handling large media files like videos:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
const range = req.headers.range;
if (!range) {
res.writeHead(400);
return res.end();
}
const videoPath = './sample.mp4';
const videoSize = fs.statSync(videoPath).size;
// Parse range requests
const CHUNK_SIZE = 10 ** 6; // 1MB
const start = Number(range.replace(/\D/g, ''));
const end = Math.min(start + CHUNK_SIZE, videoSize - 1);
// Set response headers
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${videoSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': end - start + 1,
'Content-Type': 'video/mp4'
});
// Create video stream
const videoStream = fs.createReadStream(videoPath, { start, end });
videoStream.pipe(res);
}).listen(3000);
Compression and Decompression
Streams can efficiently handle compression/decompression tasks:
const fs = require('fs');
const zlib = require('zlib');
// Compress a file
fs.createReadStream('./large-file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('./large-file.txt.gz'));
// Decompress a file
fs.createReadStream('./large-file.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('./large-file-decompressed.txt'));
CSV Data Processing
When processing large CSV files, Streams enable line-by-line handling:
const csv = require('csv-parser');
const fs = require('fs');
fs.createReadStream('large-data.csv')
.pipe(csv())
.on('data', (row) => {
console.log('Processing row:', row);
})
.on('end', () => {
console.log('CSV file processing completed');
});
Merging Multiple Data Sources
Streams can prevent memory issues when merging multiple data sources:
const { PassThrough } = require('stream');
const fs = require('fs');
// Create a merge stream
const mergeStreams = (...streams) => {
const passThrough = new PassThrough();
let ended = 0;
const checkEnd = () => {
if (++ended === streams.length) {
passThrough.end();
}
};
streams.forEach(stream => {
stream.pipe(passThrough, { end: false });
stream.on('end', checkEnd);
});
return passThrough;
};
// Example usage
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');
const merged = mergeStreams(stream1, stream2);
merged.pipe(fs.createWriteStream('merged.txt'));
Real-time Communication
Real-time communication protocols like WebSocket are typically implemented using Streams:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
// Receive client message stream
ws.on('message', (message) => {
console.log('Received message:', message);
});
// Create a periodic data stream
const interval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ time: Date.now() }));
} else {
clearInterval(interval);
}
}, 1000);
});
Image Processing
Streams allow chunk-by-chunk processing of large image files:
const sharp = require('sharp');
const fs = require('fs');
// Stream-based image processing
fs.createReadStream('input.jpg')
.pipe(sharp()
.resize(800, 600)
.jpeg({ quality: 90 })
)
.pipe(fs.createWriteStream('output.jpg'));
Data Encryption and Decryption
Stream-based encryption ensures security while reducing memory usage:
const crypto = require('crypto');
const fs = require('fs');
// Encryption stream
const cipher = crypto.createCipher('aes-256-cbc', 'secret-key');
// Decryption stream
const decipher = crypto.createDecipher('aes-256-cbc', 'secret-key');
// Encrypt a file
fs.createReadStream('secret-data.txt')
.pipe(cipher)
.pipe(fs.createWriteStream('encrypted.dat'));
// Decrypt a file
fs.createReadStream('encrypted.dat')
.pipe(decipher)
.pipe(fs.createWriteStream('decrypted.txt'));
Custom Protocol Implementation
Streams provide an excellent abstraction for implementing custom network protocols:
const net = require('net');
const { Transform } = require('stream');
// Custom protocol parser
class ProtocolParser extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = Buffer.alloc(0);
}
_transform(chunk, encoding, callback) {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 4) {
const length = this.buffer.readUInt32BE(0);
if (this.buffer.length >= 4 + length) {
const message = this.buffer.slice(4, 4 + length);
this.push(message.toString());
this.buffer = this.buffer.slice(4 + length);
} else {
break;
}
}
callback();
}
}
// Using the custom protocol
const server = net.createServer((socket) => {
socket
.pipe(new ProtocolParser())
.on('data', (message) => {
console.log('Received message:', message);
});
});
server.listen(3000);
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:Stream的错误处理
下一篇:fs模块的核心API