阿里云主机折上折
  • 微信号
Current Site:Index > Common Stream application scenarios

Common Stream application scenarios

Author:Chuan Chen 阅读数:63725人阅读 分类: Node.js

File Read/Write Operations

Streams are highly practical for file handling, especially with large files. Traditional fs.readFile loads the entire file into memory at once, whereas using Streams allows for chunk-by-chunk processing.

const fs = require('fs');

// Read a large file
const readStream = fs.createReadStream('./large-file.txt', {
  highWaterMark: 64 * 1024 // Read 64KB at a time
});

// Write to a new file
const writeStream = fs.createWriteStream('./copy-file.txt');

readStream.pipe(writeStream);

readStream.on('end', () => {
  console.log('File copy completed');
});

HTTP Requests and Responses

Node.js's HTTP module is inherently based on Streams, making it particularly useful for handling large HTTP request bodies:

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  // Stream the response
  const fileStream = fs.createReadStream('./large-data.json');
  fileStream.pipe(res);
  
  // Stream the request body
  let body = '';
  req.on('data', (chunk) => {
    body += chunk;
  });
  
  req.on('end', () => {
    console.log('Request body received');
  });
}).listen(3000);

Data Transformation Processing

By connecting multiple transform streams via pipelines, complex data processing workflows can be achieved:

const { Transform } = require('stream');

// Create a transform stream
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Use the transform stream
process.stdin
  .pipe(upperCaseTransform)
  .pipe(process.stdout);

Database Operations

When handling large database records, Streams can prevent memory overflow:

const { MongoClient } = require('mongodb');

async function streamLargeData() {
  const client = await MongoClient.connect('mongodb://localhost:27017');
  const collection = client.db('test').collection('largeCollection');
  
  const stream = collection.find().stream();
  
  stream.on('data', (doc) => {
    console.log('Processing document:', doc._id);
  });
  
  stream.on('end', () => {
    client.close();
  });
}

Real-time Log Processing

Streams are ideal for handling continuously generated log data:

const fs = require('fs');
const readline = require('readline');

const logStream = fs.createReadStream('./app.log');
const rl = readline.createInterface({
  input: logStream,
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  if (line.includes('ERROR')) {
    console.log('Found error log:', line);
  }
});

Video Stream Processing

Streams are essential for handling large media files like videos:

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  const range = req.headers.range;
  if (!range) {
    res.writeHead(400);
    return res.end();
  }
  
  const videoPath = './sample.mp4';
  const videoSize = fs.statSync(videoPath).size;
  
  // Parse range requests
  const CHUNK_SIZE = 10 ** 6; // 1MB
  const start = Number(range.replace(/\D/g, ''));
  const end = Math.min(start + CHUNK_SIZE, videoSize - 1);
  
  // Set response headers
  res.writeHead(206, {
    'Content-Range': `bytes ${start}-${end}/${videoSize}`,
    'Accept-Ranges': 'bytes',
    'Content-Length': end - start + 1,
    'Content-Type': 'video/mp4'
  });
  
  // Create video stream
  const videoStream = fs.createReadStream(videoPath, { start, end });
  videoStream.pipe(res);
}).listen(3000);

Compression and Decompression

Streams can efficiently handle compression/decompression tasks:

const fs = require('fs');
const zlib = require('zlib');

// Compress a file
fs.createReadStream('./large-file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('./large-file.txt.gz'));

// Decompress a file
fs.createReadStream('./large-file.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('./large-file-decompressed.txt'));

CSV Data Processing

When processing large CSV files, Streams enable line-by-line handling:

const csv = require('csv-parser');
const fs = require('fs');

fs.createReadStream('large-data.csv')
  .pipe(csv())
  .on('data', (row) => {
    console.log('Processing row:', row);
  })
  .on('end', () => {
    console.log('CSV file processing completed');
  });

Merging Multiple Data Sources

Streams can prevent memory issues when merging multiple data sources:

const { PassThrough } = require('stream');
const fs = require('fs');

// Create a merge stream
const mergeStreams = (...streams) => {
  const passThrough = new PassThrough();
  
  let ended = 0;
  const checkEnd = () => {
    if (++ended === streams.length) {
      passThrough.end();
    }
  };
  
  streams.forEach(stream => {
    stream.pipe(passThrough, { end: false });
    stream.on('end', checkEnd);
  });
  
  return passThrough;
};

// Example usage
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');
const merged = mergeStreams(stream1, stream2);

merged.pipe(fs.createWriteStream('merged.txt'));

Real-time Communication

Real-time communication protocols like WebSocket are typically implemented using Streams:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  // Receive client message stream
  ws.on('message', (message) => {
    console.log('Received message:', message);
  });
  
  // Create a periodic data stream
  const interval = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ time: Date.now() }));
    } else {
      clearInterval(interval);
    }
  }, 1000);
});

Image Processing

Streams allow chunk-by-chunk processing of large image files:

const sharp = require('sharp');
const fs = require('fs');

// Stream-based image processing
fs.createReadStream('input.jpg')
  .pipe(sharp()
    .resize(800, 600)
    .jpeg({ quality: 90 })
  )
  .pipe(fs.createWriteStream('output.jpg'));

Data Encryption and Decryption

Stream-based encryption ensures security while reducing memory usage:

const crypto = require('crypto');
const fs = require('fs');

// Encryption stream
const cipher = crypto.createCipher('aes-256-cbc', 'secret-key');
// Decryption stream
const decipher = crypto.createDecipher('aes-256-cbc', 'secret-key');

// Encrypt a file
fs.createReadStream('secret-data.txt')
  .pipe(cipher)
  .pipe(fs.createWriteStream('encrypted.dat'));

// Decrypt a file
fs.createReadStream('encrypted.dat')
  .pipe(decipher)
  .pipe(fs.createWriteStream('decrypted.txt'));

Custom Protocol Implementation

Streams provide an excellent abstraction for implementing custom network protocols:

const net = require('net');
const { Transform } = require('stream');

// Custom protocol parser
class ProtocolParser extends Transform {
  constructor() {
    super({ readableObjectMode: true });
    this.buffer = Buffer.alloc(0);
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk]);
    
    while (this.buffer.length >= 4) {
      const length = this.buffer.readUInt32BE(0);
      if (this.buffer.length >= 4 + length) {
        const message = this.buffer.slice(4, 4 + length);
        this.push(message.toString());
        this.buffer = this.buffer.slice(4 + length);
      } else {
        break;
      }
    }
    callback();
  }
}

// Using the custom protocol
const server = net.createServer((socket) => {
  socket
    .pipe(new ProtocolParser())
    .on('data', (message) => {
      console.log('Received message:', message);
    });
});

server.listen(3000);

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.