阿里云主机折上折
  • 微信号
Current Site:Index > Implementation methods of asynchronous flow control

Implementation methods of asynchronous flow control

Author:Chuan Chen 阅读数:64184人阅读 分类: Node.js

Callback Functions

Callback functions are the most basic form of asynchronous flow control. In Koa2, while not recommended for directly handling asynchronous logic, understanding their principles is essential for mastering more advanced control methods. Callback functions encapsulate subsequent logic as functions and pass them as parameters to asynchronous operations.

function fetchData(callback) {
  setTimeout(() => {
    callback(null, 'Data fetched');
  }, 1000);
}

fetchData((err, data) => {
  if (err) console.error(err);
  else console.log(data);
});

This pattern can easily lead to "callback hell," where multiple asynchronous operations require sequential execution, making the code difficult to maintain:

operation1((err, result1) => {
  if (err) return handleError(err);
  operation2(result1, (err, result2) => {
    if (err) return handleError(err);
    operation3(result2, (err, result3) => {
      // More nesting...
    });
  });
});

Promises

Promises, introduced in ES6, are an asynchronous solution. While Koa2 middleware does not directly return Promises, they can be used in business logic. Promises solve the nesting problem through chaining.

function fetchData() {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Data fetched');
    }, 1000);
  });
}

fetchData()
  .then(data => {
    console.log(data);
    return processData(data);
  })
  .then(processedData => {
    console.log(processedData);
  })
  .catch(err => {
    console.error(err);
  });

Promises also provide static methods for handling multiple asynchronous operations:

// Parallel execution
Promise.all([promise1, promise2])
  .then(results => {
    // results is an array containing all outcomes
  });

// Race mode
Promise.race([promise1, promise2])
  .then(firstResult => {
    // The first completed result
  });

async/await

Koa2's core advantage lies in its native support for async/await. Async functions return Promises, and await suspends function execution until the Promise resolves.

async function getData() {
  try {
    const data = await fetchData();
    const processed = await processData(data);
    return processed;
  } catch (err) {
    console.error(err);
    throw err;
  }
}

// Usage in Koa2 middleware
app.use(async (ctx, next) => {
  try {
    const result = await getData();
    ctx.body = result;
  } catch (err) {
    ctx.status = 500;
    ctx.body = { error: err.message };
  }
});

async/await makes asynchronous code appear synchronous while maintaining non-blocking characteristics. In Koa2, middleware itself is an async function, enabling easy composition:

app.use(async (ctx, next) => {
  const start = Date.now();
  await next();
  const duration = Date.now() - start;
  ctx.set('X-Response-Time', `${duration}ms`);
});

app.use(async ctx => {
  ctx.body = 'Hello World';
});

Generator Functions and the co Module

Before async/await became standard, Koa1 used Generator functions with the co module to achieve similar results. While Koa2 has shifted to async/await, understanding Generators helps in grasping the evolution of asynchronous flow control.

const co = require('co');

function* fetchDataGenerator() {
  const data = yield fetchData();
  const processed = yield processData(data);
  return processed;
}

co(fetchDataGenerator)
  .then(result => console.log(result))
  .catch(err => console.error(err));

Generator functions pause execution via yield, and the co module automatically handles value unwrapping and flow control. Koa2's async/await can be seen as syntactic sugar for Generators.

Event Emitters

Node.js's event emitter pattern can also be used for asynchronous flow control, suitable for event-driven scenarios. Koa2's context object inherits from EventEmitter.

const EventEmitter = require('events');

class DataFetcher extends EventEmitter {
  fetch() {
    setTimeout(() => {
      this.emit('data', 'Data fetched');
    }, 1000);
  }
}

const fetcher = new DataFetcher();
fetcher.on('data', data => {
  console.log(data);
});
fetcher.fetch();

In Koa2, the event mechanism can be leveraged to implement the publish/subscribe pattern:

app.use(async (ctx, next) => {
  ctx.app.emit('request-start', { url: ctx.url });
  await next();
  ctx.app.emit('request-end', { url: ctx.url, status: ctx.status });
});

// Listen for events
app.on('request-start', data => {
  console.log(`Request started: ${data.url}`);
});

Flow Control Libraries

For complex flows, specialized libraries like async.js can be used. While modern JavaScript offers better built-in solutions, such libraries may still be encountered in legacy systems.

const async = require('async');

async.waterfall([
  callback => fetchData(callback),
  (data, callback) => processData(data, callback),
  (processed, callback) => saveData(processed, callback)
], (err, result) => {
  if (err) console.error(err);
  else console.log('Final result:', result);
});

Koa2 typically doesn't require such libraries, as async/await provides a more elegant solution. However, they remain valuable for scenarios like parallel task queues.

Middleware Composition

Koa2's core feature is its middleware composition mechanism, which combines multiple middleware into a single request-handling pipeline for flow control.

const compose = require('koa-compose');

async function middleware1(ctx, next) {
  console.log('Middleware 1 start');
  await next();
  console.log('Middleware 1 end');
}

async function middleware2(ctx, next) {
  console.log('Middleware 2 start');
  await next();
  console.log('Middleware 2 end');
}

const allMiddleware = compose([middleware1, middleware2]);

app.use(allMiddleware);

The execution order is: Middleware 1 start -> Middleware 2 start -> Middleware 2 end -> Middleware 1 end, forming an "onion model." This composition method makes flow control more flexible.

Error Handling

Error handling in asynchronous flow control requires special attention. Koa2 provides a unified error-handling mechanism.

// Global error-handling middleware
app.use(async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    ctx.status = err.status || 500;
    ctx.body = { message: err.message };
    ctx.app.emit('error', err, ctx);
  }
});

// Errors in business logic
app.use(async ctx => {
  const user = await getUser(ctx.params.id);
  if (!user) {
    const err = new Error('User not found');
    err.status = 404;
    throw err;
  }
  ctx.body = user;
});

// Listen for global errors
app.on('error', (err, ctx) => {
  console.error('Server error', err, ctx);
});

Concurrency Control

When handling multiple parallel asynchronous tasks, concurrency must be controlled to avoid resource exhaustion. Koa2 can integrate third-party libraries for this purpose.

const { default: PQueue } = require('p-queue');

const queue = new PQueue({ concurrency: 3 });

app.use(async ctx => {
  const tasks = Array(10).fill().map((_, i) => 
    queue.add(() => processTask(i))
  );
  const results = await Promise.all(tasks);
  ctx.body = results;
});

For more complex scenarios, reactive programming libraries like RxJS can be used:

const { from } = require('rxjs');
const { mergeMap, toArray } = require('rxjs/operators');

app.use(async ctx => {
  const tasks = Array(10).fill().map((_, i) => i);
  const results = await from(tasks)
    .pipe(
      mergeMap(i => processTask(i), 3), // Concurrency limit: 3
      toArray()
    )
    .toPromise();
  ctx.body = results;
});

Timeout Handling

Asynchronous operations may require timeouts to prevent prolonged hanging. Koa2 can wrap Promises to implement timeout control.

function withTimeout(promise, ms) {
  return Promise.race([
    promise,
    new Promise((_, reject) => 
      setTimeout(() => reject(new Error('Timeout')), ms)
    )
  ]);
}

app.use(async ctx => {
  try {
    const data = await withTimeout(fetchData(), 5000);
    ctx.body = data;
  } catch (err) {
    if (err.message === 'Timeout') {
      ctx.status = 504;
      ctx.body = 'Request timeout';
    } else {
      throw err;
    }
  }
});

Koa2 also provides the ctx.req.setTimeout method to set request timeouts:

app.use(async (ctx, next) => {
  ctx.req.setTimeout(5000);
  await next();
});

Context Passing

Maintaining context in asynchronous flows is challenging. Koa2's ctx object spans the entire request lifecycle, solving this problem.

// Set request ID and log
app.use(async (ctx, next) => {
  ctx.requestId = generateId();
  logger.info(`Start request ${ctx.requestId}`);
  await next();
  logger.info(`End request ${ctx.requestId}`);
});

app.use(async ctx => {
  const user = await getUser(ctx.query.id);
  logger.info(`User fetched in request ${ctx.requestId}`);
  ctx.body = user;
});

For cross-request tracking scenarios, the Async Hooks API can be used:

const asyncHooks = require('async_hooks');
const store = new Map();

const hook = asyncHooks.createHook({
  init: (asyncId, _, triggerAsyncId) => {
    if (store.has(triggerAsyncId)) {
      store.set(asyncId, store.get(triggerAsyncId));
    }
  },
  destroy: asyncId => {
    store.delete(asyncId);
  }
});

hook.enable();

app.use(async (ctx, next) => {
  const asyncId = asyncHooks.executionAsyncId();
  store.set(asyncId, { requestId: generateId() });
  await next();
});

function getContext() {
  const asyncId = asyncHooks.executionAsyncId();
  return store.get(asyncId);
}

Canceling Asynchronous Operations

Canceling ongoing asynchronous operations is a common requirement in complex scenarios. Koa2 can integrate AbortController for this purpose.

app.use(async ctx => {
  const controller = new AbortController();
  const timeout = setTimeout(() => controller.abort(), 5000);
  
  try {
    const response = await fetch(url, {
      signal: controller.signal
    });
    clearTimeout(timeout);
    ctx.body = await response.json();
  } catch (err) {
    if (err.name === 'AbortError') {
      ctx.status = 504;
      ctx.body = 'Request aborted';
    } else {
      throw err;
    }
  }
});

For custom asynchronous operations, cancellation logic can be similarly implemented:

function cancellableFetch(url, { signal } = {}) {
  return new Promise((resolve, reject) => {
    if (signal && signal.aborted) {
      return reject(new DOMException('Aborted', 'AbortError'));
    }
    
    const timer = setTimeout(() => {
      fetch(url)
        .then(resolve)
        .catch(reject);
    }, 0);
    
    if (signal) {
      signal.addEventListener('abort', () => {
        clearTimeout(timer);
        reject(new DOMException('Aborted', 'AbortError'));
      });
    }
  });
}

Performance Optimization

The implementation of asynchronous flow control directly impacts performance. In Koa2, the following points should be noted:

  1. Avoid unnecessary await:
// Not recommended - sequential execution
const user = await getUser();
const posts = await getPosts();

// Recommended - parallel execution
const [user, posts] = await Promise.all([
  getUser(),
  getPosts()
]);
  1. Set reasonable concurrency limits:
// Use p-map to control concurrency
const pMap = require('p-map');

app.use(async ctx => {
  const items = Array(100).fill().map((_, i) => i);
  const results = await pMap(items, processItem, { concurrency: 5 });
  ctx.body = results;
});
  1. Cache asynchronous results:
const cache = new Map();

app.use(async ctx => {
  const key = ctx.url;
  if (cache.has(key)) {
    ctx.body = cache.get(key);
    return;
  }
  
  const data = await fetchData();
  cache.set(key, data);
  ctx.body = data;
});
  1. Use streams for large files:
const fs = require('fs');
const { pipeline } = require('stream/promises');

app.use(async ctx => {
  ctx.set('Content-Type', 'application/octet-stream');
  await pipeline(
    fs.createReadStream('large-file.bin'),
    ctx.res
  );
});

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

Front End Chuan

Front End Chuan, Chen Chuan's Code Teahouse 🍵, specializing in exorcising all kinds of stubborn bugs 💻. Daily serving baldness-warning-level development insights 🛠️, with a bonus of one-liners that'll make you laugh for ten years 🐟. Occasionally drops pixel-perfect romance brewed in a coffee cup ☕.