谈谈thinkphp6使用think-queue实现普通队列和延迟队列
###TP6 队列
TP6 中使用 think-queue 可以实现普通队列和延迟队列。
think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
- 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
- 队列的多队列, 内存限制 ,启动,停止,守护等
- 消息队列可降级为同步执行
消息队列实现过程
1、通过生产者推送消息到消息队列服务中
2、消息队列服务将收到的消息存入redis队列中(zset)
3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
4、处理获取下来的消息调用业务类进行处理相关业务
5、业务处理后,需要从队列中删除消息
composer 安装 think-queue
composer require topthink/think-queue
配置文件
安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。
tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('redis.host', '127.0.0.1'),
'port' => env('redis.port', '6379'),
'password' => env('redis.password','123456'),
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
创建目录及队列消费类文件
在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
* Class Queue 队列消费基础类
* @package app\queue
*/abstract class Queue{
/**
* @describe:fire是消息队列默认调用的方法
* @param \think\queue\Job $job
* @param $message
*/
public function fire(Job $job, $data)
{
if (empty($data)) {
Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));
return ;
}
$jobId = $job->getJobId(); // 队列的数据库id或者redis key
// $jobClassName = $job->getName(); // 队列对象类
// $queueName = $job->getQueue(); // 队列名称
// 如果已经执行中或者执行完成就不再执行了
if (!$this->checkJob($jobId, $data)) {
$job->delete();
Cache::store('redis')->delete($jobId);
return ;
}
// 执行业务处理
if ($this->execute($data)) {
Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
$job->delete(); // 任务执行成功后删除
Cache::store('redis')->delete($jobId); // 删除redis中的缓存
} else {
// 检查任务重试次数
if ($job->attempts() > 3) {
Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
// 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
//$job->release(10);
// 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
//$job->failed();
// 第3种处理方式:删除任务
$job->delete(); // 任务执行后删除
Cache::store('redis')->delete($jobId); // 删除redis中的缓存
}
}
}
/**
* 消息在到达消费者时可能已经不需要执行了
* @param string $jobId
* @param $message
* @return bool 任务执行的结果
* @throws \Psr\SimpleCache\InvalidArgumentException
*/
protected function checkJob(string $jobId, $message): bool
{
// 查询redis
$data = Cache::store('redis')->get($jobId);
if (!empty($data)) {
return false;
}
Cache::store('redis')->set($jobId, $message);
return true;
}
/**
* @describe: 根据消息中的数据进行实际的业务处理
* @param $data 数据
* @return bool 返回结果
*/
abstract protected function execute($data): bool;}
所有真正的消费类继承基础抽象类
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
protected function execute($data): bool
{
// 具体消费业务逻辑
}}
生产者逻辑
use think\facade\Queue;
// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);
// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
开启进程监听任务并执行
php think queue:listen
php think queue:work
命令模式介绍
命令模式
-
queue:work 命令
work 命令: 该命令将启动一个 work 进程来处理消息队列。
php think queue:work --queue TestQueue
-
queue:listen 命令
listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过
proc_open(‘php think queue:work’)
的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。php think queue:listen --queue TestQueue
命令行参数
-
Work 模式
php think queue:work \ --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出 --queue helloJobQueue //要处理的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
-
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //监听的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效 --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0 --timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位
可以看到 listen 模式下,不包含
--deamon
参数,原因下面会说明 -
消息队列的开始,停止与重启
-
开始一个消息队列:
php think queue:work
-
停止所有的消息队列:
php think queue:restart
-
重启所有的消息队列:
php think queue:restart php think queue:work
-
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanfhbak
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01