阿里云主机折上折
  • 微信号
Current Site:Index > The backpressure issue of streams

The backpressure issue of streams

Author:Chuan Chen 阅读数:43968人阅读 分类: Node.js

Backpressure Issues in Streams

Streams in Node.js are the core mechanism for handling large amounts of data, but when the data production speed exceeds the consumption speed, backpressure issues arise. If not addressed, this can lead to memory spikes or process crashes.

Principles of Backpressure Generation

When a Readable stream pushes data faster than a Writable stream can process it, unprocessed data accumulates in the buffer. Node.js streams have a default highWaterMark of 16KB. When this limit is exceeded:

const fs = require('fs');

// Quickly read a large file
const readStream = fs.createReadStream('huge.mov');
const writeStream = fs.createWriteStream('copy.mov');

// When the write speed cannot keep up with the read speed
readStream.on('data', (chunk) => {
  const canContinue = writeStream.write(chunk);
  if (!canContinue) {
    readStream.pause();  // Key point: manual pause required
  }
});

writeStream.on('drain', () => {
  readStream.resume();  // Key point: manual resume required
});

Automatic Backpressure Handling

Modern Node.js's pipe() method has built-in backpressure handling:

// Best practice: Use pipe with automatic backpressure control
fs.createReadStream('input.mp4')
  .pipe(fs.createWriteStream('output.mp4'))
  .on('error', console.error);

However, some scenarios require finer control:

let bytesWritten = 0;
const monitorStream = new PassThrough();

monitorStream.on('data', (chunk) => {
  bytesWritten += chunk.length;
  console.log(`Bytes written: ${bytesWritten} bytes`);
});

readStream
  .pipe(monitorStream)
  .pipe(writeStream);

Implementing Backpressure in Custom Streams

When creating custom Transform streams, backpressure must be explicitly handled:

const { Transform } = require('stream');

class ThrottleStream extends Transform {
  constructor(ms) {
    super();
    this.delay = ms;
  }

  _transform(chunk, encoding, callback) {
    this.push(chunk);
    // Simulate processing delay with active throttling
    setTimeout(callback, this.delay);
  }
}

// Usage example
const throttle = new ThrottleStream(100);
fastReadStream
  .pipe(throttle)
  .pipe(slowWriteStream);

Special Handling for High-Concurrency Scenarios

When an HTTP server handles multiple upload requests simultaneously:

const http = require('http');
const server = http.createServer((req, res) => {
  // Individual control for each request
  let uploaded = 0;
  req.on('data', (chunk) => {
    uploaded += chunk.length;
    if (uploaded > 100 * 1024 * 1024) {
      req.pause(); // Pause receiving if exceeding 100MB
      processAndClearBuffer(() => {
        req.resume();
      });
    }
  });
});

function processAndClearBuffer(done) {
  // Simulate asynchronous processing
  setTimeout(done, 1000);
}

Performance Monitoring and Debugging

Diagnose backpressure issues through event listeners:

const stream = fs.createReadStream('data.bin');

stream.on('data', (chunk) => {
  console.log('Buffer size:', stream._readableState.length);
});

stream.on('end', () => {
  console.log('Peak memory usage:', process.memoryUsage().rss);
});

// Or use third-party modules
const probe = require('stream-meter')();
stream.pipe(probe).pipe(writeStream);

Differences in Handling Various Stream Types

Object Mode Streams

const objectStream = new Transform({
  objectMode: true,
  transform(obj, _, cb) {
    // Object streams have different highWaterMark counting
    if (this._writableState.length > 100) {
      process.nextTick(cb);
    } else {
      this.push(processObject(obj));
      cb();
    }
  }
});

Duplex Streams

const { Duplex } = require('stream');
class EchoStream extends Duplex {
  _write(chunk, _, callback) {
    this.push(chunk);  // Read while writing
    callback();
  }

  _read() {
    // Implement read logic
  }
}

Common Pitfalls and Solutions

  1. Incorrect Example: Ignoring drain Events
// Anti-pattern: May cause memory leaks
readStream.on('data', (chunk) => {
  writeStream.write(chunk); // No return value check
});
  1. Correct Approach:
function copyStream(source, target) {
  return new Promise((resolve, reject) => {
    source.on('data', onData);
    target.on('drain', onDrain);
    target.on('finish', resolve);
    target.on('error', reject);

    let isPaused = false;
    function onData(chunk) {
      const canContinue = target.write(chunk);
      if (!canContinue && !isPaused) {
        isPaused = true;
        source.pause();
      }
    }

    function onDrain() {
      if (isPaused) {
        isPaused = false;
        source.resume();
      }
    }
  });
}

Advanced Use Cases

Dynamic Rate Control

class DynamicThrottle extends Transform {
  constructor() {
    super({ highWaterMark: 1 }); // Strict buffer control
    this.speed = 1;
  }

  _transform(chunk, _, cb) {
    this.push(chunk);
    // Adjust dynamically based on system load
    const delay = 1000 / this.speed;
    setTimeout(cb, delay);
  }

  adjustSpeed(newSpeed) {
    this.speed = Math.max(1, newSpeed);
  }
}

// Usage example
const throttle = new DynamicThrottle();
setInterval(() => {
  const load = process.cpuUsage().user;
  throttle.adjustSpeed(load > 50 ? 0.5 : 2);
}, 1000);

Multi-Stage Pipeline Control

const pipeline = util.promisify(stream.pipeline);

async function complexProcessing(input) {
  try {
    await pipeline(
      input,
      createDecryptStream(),
      createDecompressStream(),
      createValidationStream(),
      output
    );
  } catch (err) {
    console.error('Pipeline failed:', err);
  }
}

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:管道机制(pipe)

下一篇:自定义流的实现

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.