阿里云主机折上折
  • 微信号
Current Site:Index > The four basic flow types

The four basic flow types

Author:Chuan Chen 阅读数:58626人阅读 分类: Node.js

Basic Concepts of Streams

Streams in Node.js are abstract interfaces for handling data reading and writing, particularly suitable for processing large amounts of data or continuous data from external sources. Streams break data into smaller chunks for processing, rather than loading all data into memory at once. This approach significantly improves memory efficiency and performance, especially when dealing with large files or network communication.

The core advantage of streams lies in their non-blocking nature, allowing data to be processed immediately when available, without waiting for the entire resource to load. Node.js's stream module provides base classes for building all types of streams, which typically don't need to be used directly, but understanding their working principles is helpful.

Readable Stream

A readable stream is an abstraction of a data source, allowing data to be read from a specific location. Typical examples include file reading, HTTP requests, and process standard input. Readable streams have two reading modes: flowing mode and paused mode.

const fs = require('fs');
const readable = fs.createReadStream('./large-file.txt');

// Flowing mode
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

// Paused mode
readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log(`Read ${chunk.length} bytes of data`);
  }
});

Important events for readable streams include:

  • 'data' - Triggered when data is available to read
  • 'end' - Triggered when no more data is available to read
  • 'error' - Triggered when an error occurs during receiving or writing
  • 'close' - Triggered when the stream and its underlying resources are closed

Writable Stream

A writable stream represents a destination for data writing, commonly seen in file writing, HTTP responses, and process standard output. They provide methods to write data to a destination and handle backpressure issues.

const fs = require('fs');
const writable = fs.createWriteStream('./output.txt');

writable.write('First line\n');
writable.write('Second line\n');
writable.end('Final line\n');

writable.on('finish', () => {
  console.log('All writes completed');
});

writable.on('error', (err) => {
  console.error('Write error:', err);
});

Key methods for writable streams:

  • write() - Writes data to the stream
  • end() - Indicates writing is complete
  • cork()/uncork() - Buffers write operations
  • destroy() - Destroys the stream

Duplex Stream

A duplex stream implements both readable and writable interfaces, allowing simultaneous read and write operations. Typical examples include TCP sockets and WebSocket connections.

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.data = [];
  }
  
  _write(chunk, encoding, callback) {
    this.data.push(chunk);
    callback();
  }
  
  _read(size) {
    if (this.data.length) {
      this.push(this.data.shift());
    } else {
      this.push(null);
    }
  }
}

const duplex = new MyDuplex();
duplex.write('Hello');
duplex.write('World');
duplex.end();

duplex.on('data', (chunk) => {
  console.log(chunk.toString());
});

Characteristics of duplex streams:

  • Independent read and write buffers
  • Read and write operations do not interfere with each other
  • Commonly used for bidirectional communication scenarios

Transform Stream

A transform stream is a special type of duplex stream used to modify or transform data between writing and reading. They are commonly used for data compression, encryption, and format conversion.

const { Transform } = require('stream');
const { createGzip } = require('zlib');

// Custom transform stream example
class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);

// Built-in transform stream example
const gzip = createGzip();
const fs = require('fs');
fs.createReadStream('input.txt')
  .pipe(gzip)
  .pipe(fs.createWriteStream('input.txt.gz'));

Typical applications of transform streams:

  • Data compression/decompression (zlib)
  • Encryption/decryption (crypto)
  • Data format conversion (JSON parsing)
  • Data filtering and modification

Advanced Stream Applications

Streams in Node.js can be combined to form powerful data processing pipelines. The pipeline method safely connects multiple streams and automatically handles errors and cleanup.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.csv'),
  zlib.createGzip(),
  fs.createWriteStream('output.csv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Advanced stream patterns include:

  • Object mode: Handles JavaScript objects instead of Buffers/strings
  • Custom streams: Implements streams with specific behaviors through inheritance
  • Error handling: Centralized management of errors in pipelines
  • Backpressure management: Handles speed mismatches between data producers and consumers

Performance Optimization and Best Practices

Proper use of streams can significantly improve Node.js application performance. Here are some key optimization points:

  1. Set appropriate buffer sizes:
const readable = fs.createReadStream('file.txt', {
  highWaterMark: 64 * 1024 // 64KB
});
  1. Error handling:
stream.on('error', (err) => {
  // Handle errors and recover or close the stream
});
  1. Avoid memory leaks:
// Destroy streams that are no longer needed promptly
stream.destroy();
  1. Use promisify for callbacks:
const { pipeline } = require('stream/promises');

async function processData() {
  try {
    await pipeline(
      sourceStream,
      transformStream,
      destStream
    );
  } catch (err) {
    console.error(err);
  }
}
  1. Handle backpressure:
const writable = fs.createWriteStream('output.txt');
const readable = fs.createReadStream('input.txt');

readable.on('data', (chunk) => {
  if (!writable.write(chunk)) {
    readable.pause();
    writable.once('drain', () => readable.resume());
  }
});

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:Stream的基本概念

下一篇:管道机制(pipe)

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.