- 队列基础
- 队列驱动
- 基本使用
- 创建任务
- 任务类
- 任务数据
- 推送任务
- 立即执行
- 延迟执行
- 指定队列
- 处理任务
- 启动队列 Worker
- 启动队列 Worker
- 指定队列
- 指定连接
- 指定进程数
- Worker 选项
- 处理指定数量的任务后退出
- 运行指定时间后退出
- 任务超时时间(秒)
- 内存限制(MB)
- 守护进程模式
- 使用 Supervisor 管理队列 Worker
- /etc/supervisor/conf.d/queue-worker.conf
- 队列配置
- 文件队列
- 数据库队列
- 创建 jobs 表
- 任务重试
- 自动重试
- 失败处理
- 最佳实践
- 1. 任务设计
- 2. 任务大小
- 3. 错误处理
- 4. 队列选择
- 5. 批量任务
- 6. 监控队列
- 常见问题
- Q: 如何查看队列中的任务?
- Q: 任务执行失败怎么办?
- Q: 如何停止队列 Worker?
- Q: 队列 Worker 需要常驻运行吗?
- Q: 如何测试队列任务?
- 相关文档
队列系统
最后更新: 2026-01-27 11:02:47队列系统
Unicode Framework 提供了队列系统,支持异步任务处理,提高应用性能和响应速度。
队列基础
队列驱动
框架支持多种队列驱动:
- file: 文件队列(开发环境)
- database: 数据库队列(生产环境)
基本使用
use Unicode\Framework\Queue\QueueManager;
use Unicode\Framework\Config\Config;
$config = Config::getInstance();
$queueManager = new QueueManager($config);
// 获取队列实例
$queue = $queueManager->queue();
// 推送任务
$queue->push(SendEmailJob::class, ['email' => 'user@example.com']);
创建任务
任务类
namespace App\Jobs;
use Unicode\Framework\Interfaces\QueueJobInterface;
class SendEmailJob implements QueueJobInterface
{
public function __construct(
private string $email,
private string $subject,
private string $body,
) {}
public function handle(): void
{
// 发送邮件
mail($this->email, $this->subject, $this->body);
}
}
任务数据
class ProcessOrderJob implements QueueJobInterface
{
public function __construct(
private int $orderId,
) {}
public function handle(): void
{
$order = db('orders')->where('id', $this->orderId)->first();
// 处理订单
// ...
}
}
推送任务
立即执行
use Unicode\Framework\Queue\Queue;
// 推送任务到队列
Queue::push(SendEmailJob::class, [
'email' => 'user@example.com',
'subject' => 'Welcome',
'body' => 'Welcome to our site!',
]);
延迟执行
// 延迟 60 秒执行
Queue::later(60, SendEmailJob::class, [
'email' => 'user@example.com',
]);
// 延迟到指定时间执行
$delay = strtotime('+1 hour') - time();
Queue::later($delay, SendEmailJob::class, $data);
指定队列
// 推送到指定队列
$queue = QueueManager::getInstance()->queue('high-priority');
$queue->push(SendEmailJob::class, $data);
处理任务
启动队列 Worker
<h1 id="启动队列-worker">启动队列 Worker</h1>
php console queue:work
<h1 id="指定队列">指定队列</h1>
php console queue:work --queue=high-priority
<h1 id="指定连接">指定连接</h1>
php console queue:work --connection=database
<h1 id="指定进程数">指定进程数</h1>
php console queue:work --processes=4
Worker 选项
<h1 id="处理指定数量的任务后退出">处理指定数量的任务后退出</h1>
php console queue:work --max-jobs=100
<h1 id="运行指定时间后退出">运行指定时间后退出</h1>
php console queue:work --max-time=3600
<h1 id="任务超时时间秒">任务超时时间(秒)</h1>
php console queue:work --timeout=60
<h1 id="内存限制mb">内存限制(MB)</h1>
php console queue:work --memory=128
守护进程模式
<h1 id="使用-supervisor-管理队列-worker">使用 Supervisor 管理队列 Worker</h1>
<h1 id="etcsupervisorconfdqueue-workerconf">/etc/supervisor/conf.d/queue-worker.conf</h1>
[program:queue-worker]
command=php /path/to/console queue:work --sleep=3 --tries=3
directory=/path/to/project
autostart=true
autorestart=true
user=www-data
numprocs=2
redirect_stderr=true
stdout_logfile=/path/to/logs/queue-worker.log
队列配置
文件队列
// config/queue.php
return [
'default' => 'file',
'connections' => [
'file' => [
'driver' => 'file',
'path' => storage_path('queue'),
],
],
];
数据库队列
// config/queue.php
return [
'default' => 'database',
'connections' => [
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
],
],
];
创建 jobs 表
// 数据库迁移
public function up(): void
{
$this->schema()->create('jobs', function($table) {
$table->id();
$table->string('queue');
$table->text('payload');
$table->tinyInteger('attempts')->default(0);
$table->timestamp('reserved_at')->nullable();
$table->timestamp('available_at');
$table->timestamp('created_at');
$table->index(['queue', 'reserved_at']);
});
}
任务重试
自动重试
class SendEmailJob implements QueueJobInterface
{
public function handle(): void
{
// 如果失败,会自动重试
mail($this->email, $this->subject, $this->body);
}
// 指定最大重试次数
public function getMaxTries(): int
{
return 3;
}
// 指定重试延迟(秒)
public function getRetryDelay(): int
{
return 60;
}
}
失败处理
class SendEmailJob implements QueueJobInterface
{
public function handle(): void
{
try {
mail($this->email, $this->subject, $this->body);
} catch (\Exception $e) {
// 记录失败
$this->failed($e);
throw $e;
}
}
public function failed(\Throwable $exception): void
{
// 处理失败任务
Log::error('Email send failed', [
'email' => $this->email,
'error' => $exception->getMessage(),
]);
}
}
最佳实践
1. 任务设计
// ✅ 推荐:任务应该是独立的、幂等的
class ProcessOrderJob implements QueueJobInterface
{
public function __construct(
private int $orderId, // 只传递 ID,不传递对象
) {}
public function handle(): void
{
// 在任务中重新获取数据
$order = db('orders')->where('id', $this->orderId)->first();
// 处理订单
}
}
// ❌ 错误:传递对象引用
class ProcessOrderJob implements QueueJobInterface
{
public function __construct(
private Order $order, // 对象可能已过期
) {}
}
2. 任务大小
// ✅ 推荐:任务应该小而专注
class SendEmailJob implements QueueJobInterface
{
public function handle(): void
{
// 只做一件事:发送邮件
mail($this->email, $this->subject, $this->body);
}
}
// ❌ 错误:任务过大
class ProcessEverythingJob implements QueueJobInterface
{
public function handle(): void
{
// 发送邮件
// 更新数据库
// 调用 API
// 生成报告
// ... 太多操作
}
}
3. 错误处理
// ✅ 推荐:正确处理错误
class SendEmailJob implements QueueJobInterface
{
public function handle(): void
{
try {
mail($this->email, $this->subject, $this->body);
} catch (\Exception $e) {
Log::error('Email send failed', [
'email' => $this->email,
'error' => $e->getMessage(),
]);
throw $e; // 重新抛出,触发重试
}
}
public function getMaxTries(): int
{
return 3; // 最多重试3次
}
}
4. 队列选择
// ✅ 推荐:根据任务优先级选择队列
// 高优先级任务
Queue::push(ProcessPaymentJob::class, $data, 'high-priority');
// 低优先级任务
Queue::push(SendNotificationJob::class, $data, 'low-priority');
5. 批量任务
// ✅ 推荐:批量处理相似任务
foreach ($users as $user) {
Queue::push(SendWelcomeEmailJob::class, ['user_id' => $user->id]);
}
6. 监控队列
// ✅ 推荐:监控队列状态
$queueSize = db('jobs')
->where('queue', 'default')
->whereNull('reserved_at')
->count();
if ($queueSize > 1000) {
// 发送告警
Log::warning('Queue size too large', ['size' => $queueSize]);
}
常见问题
Q: 如何查看队列中的任务?
A: 查询 jobs 表(数据库队列)或查看队列文件(文件队列)。
Q: 任务执行失败怎么办?
A: 任务会自动重试,达到最大重试次数后会调用 failed() 方法。
Q: 如何停止队列 Worker?
A: 使用 Ctrl+C 或发送 SIGTERM 信号。
Q: 队列 Worker 需要常驻运行吗?
A: 是的,生产环境建议使用 Supervisor 或 systemd 管理 Worker 进程。
Q: 如何测试队列任务?
A: 在测试中可以使用同步队列或 Mock 队列。