阿里云主机折上折
  • 微信号
Current Site:Index > Implementation of custom streams

Implementation of custom streams

Author:Chuan Chen 阅读数:53702人阅读 分类: Node.js

What are Custom Streams

In Node.js, streams are an efficient way to handle data, especially when dealing with large or unpredictable data volumes. Custom streams allow developers to create specialized data processing pipelines tailored to specific needs. Streams are divided into four basic types: Readable, Writable, Duplex, and Transform. By inheriting from these base classes and implementing specific methods, you can build fully customized stream implementations.

Implementing a Readable Stream

To create a custom readable stream, you need to inherit from the stream.Readable class and implement the _read method. This method is called when data is requested by the consumer, and data is fed into the stream using the push method. When no more data is available, push null to signal the end of the stream.

const { Readable } = require('stream');

class RandomNumberStream extends Readable {
  constructor(options) {
    super(options);
    this.maxNumbers = 10;
    this.currentNumber = 0;
  }

  _read(size) {
    this.currentNumber += 1;
    const randomNumber = Math.random();
    
    if (this.currentNumber > this.maxNumbers) {
      this.push(null); // End the stream
    } else {
      const buffer = Buffer.from(`${randomNumber}\n`, 'utf8');
      this.push(buffer); // Push data
    }
  }
}

// Usage example
const randomStream = new RandomNumberStream();
randomStream.pipe(process.stdout);

This example creates a readable stream that generates 10 random numbers. The _read method is called each time data is needed until the maximum count is reached. The pipe method directs the stream data to standard output.

Implementing a Writable Stream

To create a custom writable stream, inherit from stream.Writable and implement the _write method. This method processes the incoming data chunks, performs the necessary operations, and then calls the callback function to signal completion.

const { Writable } = require('stream');

class FileWriter extends Writable {
  constructor(filePath, options) {
    super(options);
    this.filePath = filePath;
    this.chunks = [];
  }

  _write(chunk, encoding, callback) {
    this.chunks.push(chunk);
    fs.appendFile(this.filePath, chunk, (err) => {
      if (err) return callback(err);
      callback();
    });
  }

  _final(callback) {
    console.log(`All data has been written to ${this.filePath}`);
    callback();
  }
}

// Usage example
const writer = new FileWriter('./output.txt');
writer.write('First line of data\n');
writer.write('Second line of data\n');
writer.end('Final line of data');

This writable stream appends received data to a specified file. The _write method handles each data chunk, and _final is called when the stream ends. Properly handling the callback is crucial for backpressure management.

Building a Duplex Stream

A duplex stream implements both readable and writable interfaces. To create one, inherit from stream.Duplex and implement both _read and _write methods. This type of stream is suitable for scenarios requiring bidirectional communication.

const { Duplex } = require('stream');

class EchoStream extends Duplex {
  constructor(options) {
    super(options);
    this.dataBuffer = [];
  }

  _write(chunk, encoding, callback) {
    this.dataBuffer.push(chunk.toString());
    callback();
  }

  _read(size) {
    if (this.dataBuffer.length === 0) {
      this.push(null);
    } else {
      this.push(this.dataBuffer.shift());
    }
  }
}

// Usage example
const echo = new EchoStream();
echo.on('data', (chunk) => {
  console.log(`Received: ${chunk}`);
});
echo.write('Hello');
echo.write('World');
echo.end();

This duplex stream caches written data and returns it unchanged when read. Note that the read and write ends of a duplex stream are independent and not directly connected.

Creating a Transform Stream

A transform stream is a special type of duplex stream that inherits from stream.Transform. It modifies or transforms the written data before outputting the result. Only the _transform method needs to be implemented.

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upperString = chunk.toString().toUpperCase();
    this.push(upperString);
    callback();
  }
}

// Usage example
const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);

This transform stream converts all input text to uppercase. The _transform method receives data chunks, processes them, and outputs the result via push. Transform streams are ideal for data format conversion, encryption/decryption, and similar scenarios.

Advanced Stream Control

Node.js streams provide fine-grained control mechanisms. The highWaterMark option sets the buffer size, affecting backpressure behavior. Custom streams can implement resource cleanup by overriding the _destroy method.

const { Writable } = require('stream');

class ResourceHandler extends Writable {
  constructor(options) {
    super({ ...options, highWaterMark: 1024 * 1024 }); // 1MB buffer
    this.resource = acquireResource();
  }

  _write(chunk, encoding, callback) {
    processResource(this.resource, chunk, (err) => {
      if (err) return callback(err);
      callback();
    });
  }

  _destroy(err, callback) {
    releaseResource(this.resource, (releaseErr) => {
      callback(releaseErr || err);
    });
  }
}

This example demonstrates how to manage external resources. highWaterMark is set to 1MB, and _destroy ensures resources are properly released. Proper error handling and resource cleanup are key to robust stream implementations.

Object Mode Streams

By default, streams handle Buffers or strings, but setting objectMode: true allows them to handle arbitrary JavaScript objects. This is useful for processing complex data structures.

const { Readable } = require('stream');

class ObjectStream extends Readable {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.objects = [
      { id: 1, name: 'Object 1' },
      { id: 2, name: 'Object 2' },
      { id: 3, name: 'Object 3' }
    ];
  }

  _read(size) {
    if (this.objects.length === 0) {
      this.push(null);
    } else {
      this.push(this.objects.shift());
    }
  }
}

// Usage example
const objStream = new ObjectStream();
objStream.on('data', (obj) => {
  console.log(`Received object: ${JSON.stringify(obj)}`);
});

Object mode streams can push entire objects without serialization. Note that object mode affects how highWaterMark is counted, as it measures the number of objects rather than bytes.

Error Handling and Debugging

Robust stream implementations require comprehensive error handling. Errors can be caught using event listeners, and the _destroy method can be implemented for cleanup.

const { Writable } = require('stream');

class SafeStream extends Writable {
  constructor(options) {
    super(options);
    this.on('error', (err) => {
      console.error('Stream error:', err);
      this._cleanup();
    });
  }

  _write(chunk, encoding, callback) {
    try {
      riskyOperation(chunk);
      callback();
    } catch (err) {
      callback(err);
    }
  }

  _cleanup() {
    // Perform necessary cleanup
  }

  _destroy(err, callback) {
    this._cleanup();
    callback(err);
  }
}

This pattern ensures resources are properly released even if an error occurs. When debugging streams, use stream.pipeline instead of pipe for better error handling.

Performance Optimization Tips

Efficient custom streams should consider memory usage and throughput. Techniques like using buffer pools, avoiding unnecessary copying, and setting highWaterMark appropriately can improve performance.

const { Readable } = require('stream');
const bufferPool = require('bufferpool');

class HighPerfStream extends Readable {
  constructor(options) {
    super(options);
    this.bufferSize = 1024 * 64; // 64KB
  }

  _read(size) {
    bufferPool.alloc(this.bufferSize, (err, buffer) => {
      if (err) return this.destroy(err);
      
      // Fill the buffer
      fillBuffer(buffer);
      
      if (shouldContinue()) {
        this.push(buffer);
      } else {
        bufferPool.free(buffer);
        this.push(null);
      }
    });
  }
}

This example shows how to reuse memory with a buffer pool. For high-frequency small data chunks, consider implementing the _writev method to handle batch writes. Monitoring the stream's 'drain' event can optimize write timing.

Real-World Application Examples

Custom streams have wide-ranging applications in real-world projects. For example, implementing a CSV parser:

const { Transform } = require('stream');

class CSVParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.remainder = '';
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    const data = this.remainder + chunk.toString();
    const lines = data.split('\n');
    
    this.remainder = lines.pop(); // Save incomplete last line
    
    if (!this.headers) {
      this.headers = lines.shift().split(',');
    }
    
    for (const line of lines) {
      const values = line.split(',');
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i] ? values[i].trim() : null;
      });
      this.push(obj);
    }
    
    callback();
  }

  _flush(callback) {
    if (this.remainder) {
      const values = this.remainder.split(',');
      const obj = {};
      this.headers.forEach((header, i) => {
        obj[header.trim()] = values[i] ? values[i].trim() : null;
      });
      this.push(obj);
    }
    callback();
  }
}

This CSV parser converts a text stream into an object stream, handling field mapping and automatic data splitting. The _flush method ensures any remaining data is processed. This pattern is ideal for processing large data files.

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.