阿里云主机折上折
  • 微信号
Current Site:Index > The basic concept of Stream

The basic concept of Stream

Author:Chuan Chen 阅读数:17077人阅读 分类: Node.js

Basic Concepts of Streams

Streams are abstract interfaces in Node.js for handling streaming data, allowing data to be processed in chunks without loading it entirely into memory. This mechanism is particularly suitable for scenarios involving large files, network communication, or real-time data. The Stream module in Node.js provides four basic types: Readable, Writable, Duplex, and Transform.

Core Advantages of Streams

Traditional file processing requires reading the entire file into memory, whereas streams can split data into smaller chunks for gradual processing. For example, when processing a 10GB log file:

// Traditional approach (risk of memory explosion)
fs.readFile('huge.log', (err, data) => {
  // The entire file content is stored in the `data` variable
});

// Stream processing (memory-friendly)
const stream = fs.createReadStream('huge.log');
stream.on('data', (chunk) => {
  // Only a small chunk of data is processed at a time
});

The core features of streams include backpressure control, automatic memory management, and pipeline connectivity. When data processing cannot keep up with data production, the stream automatically pauses the source data production—a mechanism known as backpressure.

Detailed Explanation of Readable Streams

Readable streams represent data sources, such as file read streams or HTTP request bodies. There are two reading modes:

// Flowing mode (automatic pushing)
readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

// Paused mode (manual pulling)
readableStream.on('readable', () => {
  let chunk;
  while ((chunk = readableStream.read()) !== null) {
    console.log(`Read ${chunk.length} bytes`);
  }
});

Example of a custom Readable stream:

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(limit) {
    super();
    this.limit = limit;
    this.count = 0;
  }

  _read() {
    if (this.count++ < this.limit) {
      this.push(this.count.toString());
    } else {
      this.push(null); // End the stream
    }
  }
}

const counter = new CounterStream(5);
counter.pipe(process.stdout); // Output: 12345

Detailed Explanation of Writable Streams

Writable streams represent data destinations, such as file write streams or HTTP responses. Key methods include write() and end():

const { Writable } = require('stream');

class LoggerStream extends Writable {
  _write(chunk, encoding, callback) {
    console.log(`LOG: ${chunk.toString()}`);
    callback(); // Notify completion of processing
  }
}

const logger = new LoggerStream();
process.stdin.pipe(logger); // Log console input

Error handling in practical applications:

const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');

writeStream.on('error', (err) => {
  console.error('Write failed:', err);
});

writeStream.write('First piece of data\n');
writeStream.end('Final data');

Duplex and Transform Streams

Duplex streams implement both Readable and Writable interfaces, with TCP sockets being a typical example. Transform streams are special Duplex streams used for data transformation:

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const upperCase = new UpperCaseTransform();
process.stdin.pipe(upperCase).pipe(process.stdout);

A more complex example of an encryption stream:

const crypto = require('crypto');
const { pipeline } = require('stream');

const encryptStream = crypto.createCipheriv('aes-256-cbc', key, iv);
const decryptStream = crypto.createDecipheriv('aes-256-cbc', key, iv);

pipeline(
  fs.createReadStream('secret.data'),
  encryptStream,
  fs.createWriteStream('encrypted.data'),
  (err) => {
    if (err) console.error('Encryption failed', err);
  }
);

Advanced Usage of Streams

Node.js provides powerful pipeline mechanisms. Traditional pipe() method:

sourceStream
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(destinationStream);

ES module's asynchronous pipeline:

import { pipeline } from 'stream/promises';

await pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.gz')
);

Merging multiple streams:

const { PassThrough } = require('stream');
const mergeStreams = (...streams) => {
  const passThrough = new PassThrough();
  streams.forEach(stream => stream.pipe(passThrough, { end: false }));
  return passThrough;
};

Performance Optimization for Streams

Adjust highWaterMark to control buffer size:

// Default 16KB, adjusted to 1MB
const bigStream = fs.createReadStream('bigfile', {
  highWaterMark: 1024 * 1024 
});

Stream processing in object mode:

const { Readable } = require('stream');

const objectStream = new Readable({
  objectMode: true,
  read() {
    this.push({ timestamp: Date.now() });
    // Can push any JavaScript object
  }
});

Practical Application Scenarios

Streaming responses in HTTP servers:

const http = require('http');

http.createServer((req, res) => {
  const videoStream = fs.createReadStream('movie.mp4');
  res.setHeader('Content-Type', 'video/mp4');
  videoStream.pipe(res); // Avoid buffering the entire video file
}).listen(3000);

Streaming database query results:

const { MongoClient } = require('mongodb');

async function streamLargeCollection() {
  const client = await MongoClient.connect(uri);
  const cursor = client.db().collection('huge').find();
  
  cursor.stream().on('data', (doc) => {
    // Process documents one by one
  }).on('end', () => {
    client.close();
  });
}

Error Handling Patterns for Streams

Robust stream processing should include comprehensive error handling:

const handleStream = async () => {
  try {
    const readStream = fs.createReadStream('input.txt');
    const writeStream = fs.createWriteStream('output.txt');
    
    readStream.on('error', cleanup);
    writeStream.on('error', cleanup);
    
    await pipeline(
      readStream,
      new TransformStream({/*...*/}),
      writeStream
    );
  } catch (err) {
    console.error('Pipeline processing failed:', err);
  }
};

function cleanup(err) {
  // Close all resources
}

Streams and Async Iterators

Modern Node.js supports async iterators for stream operations:

async function processStream() {
  const readable = fs.createReadStream('data.txt', { encoding: 'utf8' });
  
  for await (const chunk of readable) {
    console.log('Processing chunk:', chunk.length);
  }
}

Custom iterable stream:

class IterableStream extends Readable {
  constructor(iterator) {
    super({ objectMode: true });
    this.iterator = iterator;
  }

  async _read() {
    try {
      const { value, done } = await this.iterator.next();
      if (done) this.push(null);
      else this.push(value);
    } catch (err) {
      this.destroy(err);
    }
  }
}

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.