Implementation methods of asynchronous flow control
Callback Functions
Callback functions are the most basic form of asynchronous flow control. In Koa2, while not recommended for directly handling asynchronous logic, understanding their principles is essential for mastering more advanced control methods. Callback functions encapsulate subsequent logic as functions and pass them as parameters to asynchronous operations.
function fetchData(callback) {
setTimeout(() => {
callback(null, 'Data fetched');
}, 1000);
}
fetchData((err, data) => {
if (err) console.error(err);
else console.log(data);
});
This pattern can easily lead to "callback hell," where multiple asynchronous operations require sequential execution, making the code difficult to maintain:
operation1((err, result1) => {
if (err) return handleError(err);
operation2(result1, (err, result2) => {
if (err) return handleError(err);
operation3(result2, (err, result3) => {
// More nesting...
});
});
});
Promises
Promises, introduced in ES6, are an asynchronous solution. While Koa2 middleware does not directly return Promises, they can be used in business logic. Promises solve the nesting problem through chaining.
function fetchData() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Data fetched');
}, 1000);
});
}
fetchData()
.then(data => {
console.log(data);
return processData(data);
})
.then(processedData => {
console.log(processedData);
})
.catch(err => {
console.error(err);
});
Promises also provide static methods for handling multiple asynchronous operations:
// Parallel execution
Promise.all([promise1, promise2])
.then(results => {
// results is an array containing all outcomes
});
// Race mode
Promise.race([promise1, promise2])
.then(firstResult => {
// The first completed result
});
async/await
Koa2's core advantage lies in its native support for async/await. Async functions return Promises, and await suspends function execution until the Promise resolves.
async function getData() {
try {
const data = await fetchData();
const processed = await processData(data);
return processed;
} catch (err) {
console.error(err);
throw err;
}
}
// Usage in Koa2 middleware
app.use(async (ctx, next) => {
try {
const result = await getData();
ctx.body = result;
} catch (err) {
ctx.status = 500;
ctx.body = { error: err.message };
}
});
async/await makes asynchronous code appear synchronous while maintaining non-blocking characteristics. In Koa2, middleware itself is an async function, enabling easy composition:
app.use(async (ctx, next) => {
const start = Date.now();
await next();
const duration = Date.now() - start;
ctx.set('X-Response-Time', `${duration}ms`);
});
app.use(async ctx => {
ctx.body = 'Hello World';
});
Generator Functions and the co Module
Before async/await became standard, Koa1 used Generator functions with the co module to achieve similar results. While Koa2 has shifted to async/await, understanding Generators helps in grasping the evolution of asynchronous flow control.
const co = require('co');
function* fetchDataGenerator() {
const data = yield fetchData();
const processed = yield processData(data);
return processed;
}
co(fetchDataGenerator)
.then(result => console.log(result))
.catch(err => console.error(err));
Generator functions pause execution via yield, and the co module automatically handles value unwrapping and flow control. Koa2's async/await can be seen as syntactic sugar for Generators.
Event Emitters
Node.js's event emitter pattern can also be used for asynchronous flow control, suitable for event-driven scenarios. Koa2's context object inherits from EventEmitter.
const EventEmitter = require('events');
class DataFetcher extends EventEmitter {
fetch() {
setTimeout(() => {
this.emit('data', 'Data fetched');
}, 1000);
}
}
const fetcher = new DataFetcher();
fetcher.on('data', data => {
console.log(data);
});
fetcher.fetch();
In Koa2, the event mechanism can be leveraged to implement the publish/subscribe pattern:
app.use(async (ctx, next) => {
ctx.app.emit('request-start', { url: ctx.url });
await next();
ctx.app.emit('request-end', { url: ctx.url, status: ctx.status });
});
// Listen for events
app.on('request-start', data => {
console.log(`Request started: ${data.url}`);
});
Flow Control Libraries
For complex flows, specialized libraries like async.js can be used. While modern JavaScript offers better built-in solutions, such libraries may still be encountered in legacy systems.
const async = require('async');
async.waterfall([
callback => fetchData(callback),
(data, callback) => processData(data, callback),
(processed, callback) => saveData(processed, callback)
], (err, result) => {
if (err) console.error(err);
else console.log('Final result:', result);
});
Koa2 typically doesn't require such libraries, as async/await provides a more elegant solution. However, they remain valuable for scenarios like parallel task queues.
Middleware Composition
Koa2's core feature is its middleware composition mechanism, which combines multiple middleware into a single request-handling pipeline for flow control.
const compose = require('koa-compose');
async function middleware1(ctx, next) {
console.log('Middleware 1 start');
await next();
console.log('Middleware 1 end');
}
async function middleware2(ctx, next) {
console.log('Middleware 2 start');
await next();
console.log('Middleware 2 end');
}
const allMiddleware = compose([middleware1, middleware2]);
app.use(allMiddleware);
The execution order is: Middleware 1 start -> Middleware 2 start -> Middleware 2 end -> Middleware 1 end, forming an "onion model." This composition method makes flow control more flexible.
Error Handling
Error handling in asynchronous flow control requires special attention. Koa2 provides a unified error-handling mechanism.
// Global error-handling middleware
app.use(async (ctx, next) => {
try {
await next();
} catch (err) {
ctx.status = err.status || 500;
ctx.body = { message: err.message };
ctx.app.emit('error', err, ctx);
}
});
// Errors in business logic
app.use(async ctx => {
const user = await getUser(ctx.params.id);
if (!user) {
const err = new Error('User not found');
err.status = 404;
throw err;
}
ctx.body = user;
});
// Listen for global errors
app.on('error', (err, ctx) => {
console.error('Server error', err, ctx);
});
Concurrency Control
When handling multiple parallel asynchronous tasks, concurrency must be controlled to avoid resource exhaustion. Koa2 can integrate third-party libraries for this purpose.
const { default: PQueue } = require('p-queue');
const queue = new PQueue({ concurrency: 3 });
app.use(async ctx => {
const tasks = Array(10).fill().map((_, i) =>
queue.add(() => processTask(i))
);
const results = await Promise.all(tasks);
ctx.body = results;
});
For more complex scenarios, reactive programming libraries like RxJS can be used:
const { from } = require('rxjs');
const { mergeMap, toArray } = require('rxjs/operators');
app.use(async ctx => {
const tasks = Array(10).fill().map((_, i) => i);
const results = await from(tasks)
.pipe(
mergeMap(i => processTask(i), 3), // Concurrency limit: 3
toArray()
)
.toPromise();
ctx.body = results;
});
Timeout Handling
Asynchronous operations may require timeouts to prevent prolonged hanging. Koa2 can wrap Promises to implement timeout control.
function withTimeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), ms)
)
]);
}
app.use(async ctx => {
try {
const data = await withTimeout(fetchData(), 5000);
ctx.body = data;
} catch (err) {
if (err.message === 'Timeout') {
ctx.status = 504;
ctx.body = 'Request timeout';
} else {
throw err;
}
}
});
Koa2 also provides the ctx.req.setTimeout
method to set request timeouts:
app.use(async (ctx, next) => {
ctx.req.setTimeout(5000);
await next();
});
Context Passing
Maintaining context in asynchronous flows is challenging. Koa2's ctx
object spans the entire request lifecycle, solving this problem.
// Set request ID and log
app.use(async (ctx, next) => {
ctx.requestId = generateId();
logger.info(`Start request ${ctx.requestId}`);
await next();
logger.info(`End request ${ctx.requestId}`);
});
app.use(async ctx => {
const user = await getUser(ctx.query.id);
logger.info(`User fetched in request ${ctx.requestId}`);
ctx.body = user;
});
For cross-request tracking scenarios, the Async Hooks API can be used:
const asyncHooks = require('async_hooks');
const store = new Map();
const hook = asyncHooks.createHook({
init: (asyncId, _, triggerAsyncId) => {
if (store.has(triggerAsyncId)) {
store.set(asyncId, store.get(triggerAsyncId));
}
},
destroy: asyncId => {
store.delete(asyncId);
}
});
hook.enable();
app.use(async (ctx, next) => {
const asyncId = asyncHooks.executionAsyncId();
store.set(asyncId, { requestId: generateId() });
await next();
});
function getContext() {
const asyncId = asyncHooks.executionAsyncId();
return store.get(asyncId);
}
Canceling Asynchronous Operations
Canceling ongoing asynchronous operations is a common requirement in complex scenarios. Koa2 can integrate AbortController for this purpose.
app.use(async ctx => {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 5000);
try {
const response = await fetch(url, {
signal: controller.signal
});
clearTimeout(timeout);
ctx.body = await response.json();
} catch (err) {
if (err.name === 'AbortError') {
ctx.status = 504;
ctx.body = 'Request aborted';
} else {
throw err;
}
}
});
For custom asynchronous operations, cancellation logic can be similarly implemented:
function cancellableFetch(url, { signal } = {}) {
return new Promise((resolve, reject) => {
if (signal && signal.aborted) {
return reject(new DOMException('Aborted', 'AbortError'));
}
const timer = setTimeout(() => {
fetch(url)
.then(resolve)
.catch(reject);
}, 0);
if (signal) {
signal.addEventListener('abort', () => {
clearTimeout(timer);
reject(new DOMException('Aborted', 'AbortError'));
});
}
});
}
Performance Optimization
The implementation of asynchronous flow control directly impacts performance. In Koa2, the following points should be noted:
- Avoid unnecessary
await
:
// Not recommended - sequential execution
const user = await getUser();
const posts = await getPosts();
// Recommended - parallel execution
const [user, posts] = await Promise.all([
getUser(),
getPosts()
]);
- Set reasonable concurrency limits:
// Use p-map to control concurrency
const pMap = require('p-map');
app.use(async ctx => {
const items = Array(100).fill().map((_, i) => i);
const results = await pMap(items, processItem, { concurrency: 5 });
ctx.body = results;
});
- Cache asynchronous results:
const cache = new Map();
app.use(async ctx => {
const key = ctx.url;
if (cache.has(key)) {
ctx.body = cache.get(key);
return;
}
const data = await fetchData();
cache.set(key, data);
ctx.body = data;
});
- Use streams for large files:
const fs = require('fs');
const { pipeline } = require('stream/promises');
app.use(async ctx => {
ctx.set('Content-Type', 'application/octet-stream');
await pipeline(
fs.createReadStream('large-file.bin'),
ctx.res
);
});
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:Koa2 的轻量级设计哲学
下一篇:Context 对象的组成与作用