使用任务队列kue处理订单

  |  

背景

在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。

在高并发或者在计算资源有限的情况下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。

简介

kue是基于node.js构建的,redis支持的优先级任务队列。

使用

在koa中使用
安装插件

1
2
yarn add kue
yarn add ioredis

在services中新增kue.service.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
'use strict';

// 使用kue任务队列处理订单任务
const kue = require('kue');
const config = require('../config/config');
const Queue = kue.createQueue(config.queue);
const ErrorConf = require('../../common/config').error_conf;
const CustomError = require('../../common/lib/error_constructor').CustomError;
const Order = require('./order.service');

module.exports = {
// 创建定时任务
async crateTask({ title, params = {}, delayTime }) {
try {
let job = Queue.create(title, params)
.delay(delayTime) // 延时多少毫秒
.attempts(2)// 重试2次
.removeOnComplete(true) // 在完成时使用自动删除作业
.save();

return job;
} catch (error) {
console.log('error===========>', error);
throw new CustomError('创建定时任务失败', ErrorConf.server);
}
},
// 定时器处理订单任务
async runProcessJob() {
this.orderPayment();
this.studentOrder();
},
// 处理未支付的订单
async orderPayment() {
Queue.process('orderPayment', 2, async (job, done) => {
let json = JSON.parse(JSON.stringify(job));
try {
if (json.data) {
// 自己自定义处理订单
await Order.orderPayment(json.data);
}
} catch (error) {
console.log('error', error);
}
done(); // 执行下一步
});
Queue.watchStuckJobs(1000);
},
// 自动完成订单
async studentOrder() {
Queue.process('studentOrder', 2, async (job, done) => {
let json = JSON.parse(JSON.stringify(job));
try {
if (json.data) {
// 自己自定义处理订单
await Order.OrderSuccess(json.data);
}
} catch (error) {
console.log('error', error);
}
done(); // 执行下一步
});
Queue.watchStuckJobs(1000);
},
};

创建任务

1
2
3
4
5
6
const job = require('../services/kue.service');
let jobInfo = await job.crateTask({
'title': 'orderPayment',
'params': { orderId },
'delayTime': 24 * 60 * 60 * 1000, // 一天时间关闭未支付订单
});

在app.js中执行任务

1
2
3
4
5
6
App.listen(config.port, () => {
console.log(`项目启动成功 端口号 PORT === ${config.port}`);

// 处理订单定时任务
job.runProcessJob();
});

在egg 中使用 参考文档

文章目录
  1. 1. 背景
  2. 2. 简介
  3. 3. 使用