阿里云主机折上折
  • 微信号
Current Site:Index > Publish/Subscribe pattern

Publish/Subscribe pattern

Author:Chuan Chen 阅读数:39179人阅读 分类: Node.js

Core Concepts of the Publish/Subscribe Pattern

The Publish/Subscribe pattern (Pub/Sub) is a messaging paradigm where message senders (publishers) do not directly send messages to specific receivers (subscribers). Instead, messages are categorized and published to specific channels. Subscribers can subscribe to one or more channels and only receive messages they are interested in. This pattern achieves complete decoupling between publishers and subscribers—publishers need not know about the existence of subscribers, and subscribers need not care about the source of messages.

In Node.js, the Publish/Subscribe pattern is commonly used for event handling, real-time communication, and microservices architecture. Typical implementations include:

  • In-process event buses (e.g., EventEmitter)
  • Distributed message queues (e.g., Redis Pub/Sub)
  • WebSocket-based real-time communication

EventEmitter Implementation in Node.js

Node.js's built-in events module provides a basic implementation of the Publish/Subscribe pattern. Here is a complete example:

const EventEmitter = require('events');

class OrderService extends EventEmitter {
  constructor() {
    super();
    // Simulate order creation
    this.createOrder = (orderData) => {
      // Business logic...
      this.emit('order_created', orderData);
    };
  }
}

const orderService = new OrderService();

// Subscribe to the order creation event
orderService.on('order_created', (order) => {
  console.log(`[Email Service] Order creation notification: ${order.id}`);
});

// Another subscriber
orderService.on('order_created', (order) => {
  console.log(`[Inventory Service] Deduct inventory: ${order.items}`);
});

// Trigger the event
orderService.createOrder({
  id: '1001',
  items: ['Product A', 'Product B'],
  total: 299.99
});

Characteristics of this implementation:

  • Synchronous event triggering: emit() synchronously calls all listeners
  • Support for one-time listeners: using the once() method
  • Ability to get the listener count: listenerCount()
  • Special handling for error events: the 'error' event

Redis Pub/Sub Implementation

For distributed systems, Redis's Pub/Sub functionality can be used:

const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();

// Subscriber
subscriber.on('message', (channel, message) => {
  console.log(`Received message from ${channel} channel: ${message}`);
});

subscriber.subscribe('notifications');

// Publisher
publisher.publish('notifications', 'System will be upgraded tonight');

Features of Redis Pub/Sub:

  • Cross-process/cross-server communication
  • Support for pattern matching in channels (PSUBSCRIBE)
  • Messages are not persisted
  • Suitable for broadcast scenarios

Advanced Pattern Implementation

More complex Pub/Sub systems can implement the following features:

class AdvancedPubSub {
  constructor() {
    this.channels = new Map();
  }

  subscribe(channel, callback) {
    if (!this.channels.has(channel)) {
      this.channels.set(channel, new Set());
    }
    this.channels.get(channel).add(callback);
    return () => this.unsubscribe(channel, callback);
  }

  unsubscribe(channel, callback) {
    if (this.channels.has(channel)) {
      this.channels.get(channel).delete(callback);
    }
  }

  publish(channel, data) {
    if (this.channels.has(channel)) {
      this.channels.get(channel).forEach(callback => {
        try {
          callback(data);
        } catch (e) {
          console.error(`Callback execution error: ${e}`);
        }
      });
    }
  }
}

// Usage example
const pubsub = new AdvancedPubSub();
const unsubscribe = pubsub.subscribe('news', (data) => {
  console.log(`Received news: ${data.title}`);
});

pubsub.publish('news', { title: 'Node.js releases new version' });
unsubscribe(); // Unsubscribe

Practical Application Scenarios

  1. Microservices Communication: Services communicate via a message bus.
// Order service
eventBus.publish('order_created', {
  orderId: '123',
  userId: 'user456'
});

// User service
eventBus.subscribe('order_created', (data) => {
  updateUserOrderHistory(data.userId, data.orderId);
});
  1. Real-Time Notification System:
// WebSocket service
socket.on('connection', (client) => {
  const userId = getUserId(client);
  eventBus.subscribe(`user_${userId}`, (message) => {
    client.send(JSON.stringify(message));
  });
});

// Business logic
eventBus.publish(`user_123`, {
  type: 'MESSAGE',
  content: 'You have a new message'
});
  1. Plugin System Event Hooks:
// Core system
class CoreSystem {
  constructor() {
    this.hooks = {
      beforeSave: new AsyncSeriesHook(['data']),
      afterSave: new SyncHook(['savedData'])
    };
  }
}

// Plugin
coreSystem.hooks.beforeSave.tap('validationPlugin', (data) => {
  if (!data.valid) throw new Error('Invalid data');
});

Performance Considerations and Best Practices

  1. Avoid Memory Leaks:
// Bad example
class LeakyClass {
  constructor(eventEmitter) {
    this.handler = (data) => { /*...*/ };
    eventEmitter.on('event', this.handler);
  }
}

// Correct approach
class SafeClass {
  constructor(eventEmitter) {
    this._cleanup = () => eventEmitter.off('event', this.handler);
    this.handler = (data) => { /*...*/ };
    eventEmitter.on('event', this.handler);
  }
  
  destroy() {
    this._cleanup();
  }
}
  1. Error Handling Strategies:
// Global error handling
eventBus.subscribe('error', (err) => {
  monitoringService.report(err);
});

// Publish method with error propagation
publishWithErrorHandling(channel, data) {
  try {
    this.publish(channel, data);
  } catch (e) {
    this.publish('error', e);
  }
}
  1. Performance Optimization Techniques:
// Use Set instead of array for listeners
this.listeners = new Set();

// Batch publish optimization
batchPublish(events) {
  const channels = new Set();
  events.forEach(event => channels.add(event.channel));
  
  channels.forEach(channel => {
    const relatedEvents = events.filter(e => e.channel === channel);
    this.publish(channel, relatedEvents);
  });
}

Comparison with Other Patterns

  1. Difference from Observer Pattern:
// Observer pattern requires explicit knowledge of the observed target
class Subject {
  constructor() {
    this.observers = [];
  }
  
  addObserver(observer) {
    this.observers.push(observer);
  }
  
  notify(data) {
    this.observers.forEach(observer => observer.update(data));
  }
}

// Pub/Sub pattern is fully decoupled
eventBus.subscribe('event', callback);
eventBus.publish('event', data);
  1. Combination with Middleware Pattern:
// Pub/Sub system with middleware
publish(channel, data) {
  const middlewarePipeline = this.middlewares[channel] || [];
  
  const runner = (index) => {
    if (index >= middlewarePipeline.length) {
      return this._realPublish(channel, data);
    }
    
    middlewarePipeline[index](data, () => runner(index + 1));
  };
  
  runner(0);
}

Variants in Modern JavaScript

  1. Pub/Sub in React Context:
const EventContext = createContext();

function EventProvider({ children }) {
  const listeners = useRef({});
  
  const subscribe = (event, callback) => {
    listeners.current[event] = listeners.current[event] || [];
    listeners.current[event].push(callback);
  };
  
  const publish = (event, data) => {
    (listeners.current[event] || []).forEach(cb => cb(data));
  };
  
  return (
    <EventContext.Provider value={{ subscribe, publish }}>
      {children}
    </EventContext.Provider>
  );
}
  1. Implementation with RxJS:
const subject = new Subject();

// Subscribe
const subscription = subject.subscribe({
  next: (v) => console.log(`Received: ${v}`)
});

// Publish
subject.next('Message 1');
subject.next('Message 2');

// Unsubscribe
subscription.unsubscribe();

Message Protocol Extensions

  1. Message Format Supporting Metadata:
{
  "id": "msg_123456",
  "timestamp": 1620000000,
  "channel": "user_updates",
  "payload": {
    "userId": "usr_789",
    "action": "profile_update"
  },
  "metadata": {
    "source": "auth_service",
    "priority": "high"
  }
}
  1. Publish with Acknowledgment Mechanism:
async publishWithAck(channel, message, timeout = 5000) {
  return new Promise((resolve, reject) => {
    const ackChannel = `${channel}.ack.${message.id}`;
    let timer;
    
    const cleanup = () => {
      this.unsubscribe(ackChannel, handler);
      clearTimeout(timer);
    };
    
    const handler = (ack) => {
      cleanup();
      resolve(ack);
    };
    
    this.subscribe(ackChannel, handler);
    this.publish(channel, message);
    
    timer = setTimeout(() => {
      cleanup();
      reject(new Error('Acknowledgment timeout'));
    }, timeout);
  });
}

Testing Strategies

  1. Unit Test Example:
describe('Pub/Sub System', () => {
  let pubsub;
  
  beforeEach(() => {
    pubsub = new PubSub();
  });
  
  it('should receive messages from subscribed channels', () => {
    const mockCallback = jest.fn();
    pubsub.subscribe('test', mockCallback);
    
    pubsub.publish('test', 'message');
    
    expect(mockCallback).toHaveBeenCalledWith('message');
  });
  
  it('should not receive messages from unsubscribed channels', () => {
    const mockCallback = jest.fn();
    pubsub.subscribe('test1', mockCallback);
    
    pubsub.publish('test2', 'message');
    
    expect(mockCallback).not.toHaveBeenCalled();
  });
});
  1. Performance Testing Plan:
const { performance } = require('perf_hooks');

function runBenchmark() {
  const pubsub = new PubSub();
  const start = performance.now();
  const count = 100000;
  
  // Register many listeners
  for (let i = 0; i < count; i++) {
    pubsub.subscribe(`channel_${i % 10}`, () => {});
  }
  
  // Trigger events
  const publishStart = performance.now();
  for (let i = 0; i < count; i++) {
    pubsub.publish(`channel_${i % 10}`, { data: i });
  }
  
  console.log(`Registration time: ${publishStart - start}ms`);
  console.log(`Publish time: ${performance.now() - publishStart}ms`);
}

Browser Environment Implementation

  1. CustomEvent-Based Implementation:
class BrowserPubSub {
  constructor() {
    this.target = new EventTarget();
  }
  
  subscribe(event, callback) {
    const handler = (e) => callback(e.detail);
    this.target.addEventListener(event, handler);
    return () => this.target.removeEventListener(event, handler);
  }
  
  publish(event, data) {
    this.target.dispatchEvent(new CustomEvent(event, { detail: data }));
  }
}

// Usage example
const pubsub = new BrowserPubSub();
const unsubscribe = pubsub.subscribe('click', (data) => {
  console.log('Click event:', data);
});

document.body.addEventListener('click', () => {
  pubsub.publish('click', { x: 10, y: 20 });
});
  1. Cross-Tab Communication:
// Main page
const channel = new BroadcastChannel('app_events');
channel.postMessage({
  type: 'DATA_UPDATE',
  payload: { /*...*/ }
});

// Other tabs
const channel = new BroadcastChannel('app_events');
channel.onmessage = (event) => {
  if (event.data.type === 'DATA_UPDATE') {
    updateUI(event.data.payload);
  }
};

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:事件发射器模式

下一篇:生成器与协程

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.