队列系统

最后更新: 2026-01-27 11:02:47

队列系统

Unicode Framework 提供了队列系统,支持异步任务处理,提高应用性能和响应速度。

队列基础

队列驱动

框架支持多种队列驱动:

  • file: 文件队列(开发环境)
  • database: 数据库队列(生产环境)
  • 基本使用

    use Unicode\Framework\Queue\QueueManager;
    use Unicode\Framework\Config\Config;
    

    $config = Config::getInstance(); $queueManager = new QueueManager($config);

    // 获取队列实例 $queue = $queueManager->queue();

    // 推送任务 $queue->push(SendEmailJob::class, ['email' => 'user@example.com']);

    创建任务

    任务类

    namespace App\Jobs;
    

    use Unicode\Framework\Interfaces\QueueJobInterface;

    class SendEmailJob implements QueueJobInterface { public function __construct( private string $email, private string $subject, private string $body, ) {}

    public function handle(): void { // 发送邮件 mail($this->email, $this->subject, $this->body); } }

    任务数据

    class ProcessOrderJob implements QueueJobInterface
    {
        public function __construct(
            private int $orderId,
        ) {}
    

    public function handle(): void { $order = db('orders')->where('id', $this->orderId)->first();

    // 处理订单 // ... } }

    推送任务

    立即执行

    use Unicode\Framework\Queue\Queue;
    

    // 推送任务到队列 Queue::push(SendEmailJob::class, [ 'email' => 'user@example.com', 'subject' => 'Welcome', 'body' => 'Welcome to our site!', ]);

    延迟执行

    // 延迟 60 秒执行
    Queue::later(60, SendEmailJob::class, [
        'email' => 'user@example.com',
    ]);
    

    // 延迟到指定时间执行 $delay = strtotime('+1 hour') - time(); Queue::later($delay, SendEmailJob::class, $data);

    指定队列

    // 推送到指定队列
    $queue = QueueManager::getInstance()->queue('high-priority');
    $queue->push(SendEmailJob::class, $data);
    

    处理任务

    启动队列 Worker

    <h1 id="启动队列-worker">启动队列 Worker</h1>
    php console queue:work
    

    <h1 id="指定队列">指定队列</h1> php console queue:work --queue=high-priority

    <h1 id="指定连接">指定连接</h1> php console queue:work --connection=database

    <h1 id="指定进程数">指定进程数</h1> php console queue:work --processes=4

    Worker 选项

    <h1 id="处理指定数量的任务后退出">处理指定数量的任务后退出</h1>
    php console queue:work --max-jobs=100
    

    <h1 id="运行指定时间后退出">运行指定时间后退出</h1> php console queue:work --max-time=3600

    <h1 id="任务超时时间秒">任务超时时间(秒)</h1> php console queue:work --timeout=60

    <h1 id="内存限制mb">内存限制(MB)</h1> php console queue:work --memory=128

    守护进程模式

    <h1 id="使用-supervisor-管理队列-worker">使用 Supervisor 管理队列 Worker</h1>
    <h1 id="etcsupervisorconfdqueue-workerconf">/etc/supervisor/conf.d/queue-worker.conf</h1>
    [program:queue-worker]
    command=php /path/to/console queue:work --sleep=3 --tries=3
    directory=/path/to/project
    autostart=true
    autorestart=true
    user=www-data
    numprocs=2
    redirect_stderr=true
    stdout_logfile=/path/to/logs/queue-worker.log
    

    队列配置

    文件队列

    // config/queue.php
    return [
        'default' => 'file',
        'connections' => [
            'file' => [
                'driver' => 'file',
                'path' => storage_path('queue'),
            ],
        ],
    ];
    

    数据库队列

    // config/queue.php
    return [
        'default' => 'database',
        'connections' => [
            'database' => [
                'driver' => 'database',
                'table' => 'jobs',
                'queue' => 'default',
            ],
        ],
    ];
    

    创建 jobs 表

    // 数据库迁移
    public function up(): void
    {
        $this->schema()->create('jobs', function($table) {
            $table->id();
            $table->string('queue');
            $table->text('payload');
            $table->tinyInteger('attempts')->default(0);
            $table->timestamp('reserved_at')->nullable();
            $table->timestamp('available_at');
            $table->timestamp('created_at');
    

    $table->index(['queue', 'reserved_at']); }); }

    任务重试

    自动重试

    class SendEmailJob implements QueueJobInterface
    {
        public function handle(): void
        {
            // 如果失败,会自动重试
            mail($this->email, $this->subject, $this->body);
        }
    

    // 指定最大重试次数 public function getMaxTries(): int { return 3; }

    // 指定重试延迟(秒) public function getRetryDelay(): int { return 60; } }

    失败处理

    class SendEmailJob implements QueueJobInterface
    {
        public function handle(): void
        {
            try {
                mail($this->email, $this->subject, $this->body);
            } catch (\Exception $e) {
                // 记录失败
                $this->failed($e);
                throw $e;
            }
        }
    

    public function failed(\Throwable $exception): void { // 处理失败任务 Log::error('Email send failed', [ 'email' => $this->email, 'error' => $exception->getMessage(), ]); } }

    最佳实践

    1. 任务设计

    // ✅ 推荐:任务应该是独立的、幂等的
    class ProcessOrderJob implements QueueJobInterface
    {
        public function __construct(
            private int $orderId,  // 只传递 ID,不传递对象
        ) {}
    

    public function handle(): void { // 在任务中重新获取数据 $order = db('orders')->where('id', $this->orderId)->first(); // 处理订单 } }

    // ❌ 错误:传递对象引用 class ProcessOrderJob implements QueueJobInterface { public function __construct( private Order $order, // 对象可能已过期 ) {} }

    2. 任务大小

    // ✅ 推荐:任务应该小而专注
    class SendEmailJob implements QueueJobInterface
    {
        public function handle(): void
        {
            // 只做一件事:发送邮件
            mail($this->email, $this->subject, $this->body);
        }
    }
    

    // ❌ 错误:任务过大 class ProcessEverythingJob implements QueueJobInterface { public function handle(): void { // 发送邮件 // 更新数据库 // 调用 API // 生成报告 // ... 太多操作 } }

    3. 错误处理

    // ✅ 推荐:正确处理错误
    class SendEmailJob implements QueueJobInterface
    {
        public function handle(): void
        {
            try {
                mail($this->email, $this->subject, $this->body);
            } catch (\Exception $e) {
                Log::error('Email send failed', [
                    'email' => $this->email,
                    'error' => $e->getMessage(),
                ]);
                throw $e;  // 重新抛出,触发重试
            }
        }
    

    public function getMaxTries(): int { return 3; // 最多重试3次 } }

    4. 队列选择

    // ✅ 推荐:根据任务优先级选择队列
    // 高优先级任务
    Queue::push(ProcessPaymentJob::class, $data, 'high-priority');
    

    // 低优先级任务 Queue::push(SendNotificationJob::class, $data, 'low-priority');

    5. 批量任务

    // ✅ 推荐:批量处理相似任务
    foreach ($users as $user) {
        Queue::push(SendWelcomeEmailJob::class, ['user_id' => $user->id]);
    }
    

    6. 监控队列

    // ✅ 推荐:监控队列状态
    $queueSize = db('jobs')
        ->where('queue', 'default')
        ->whereNull('reserved_at')
        ->count();
    

    if ($queueSize > 1000) { // 发送告警 Log::warning('Queue size too large', ['size' => $queueSize]); }

    常见问题

    Q: 如何查看队列中的任务?

    A: 查询 jobs 表(数据库队列)或查看队列文件(文件队列)。

    Q: 任务执行失败怎么办?

    A: 任务会自动重试,达到最大重试次数后会调用 failed() 方法。

    Q: 如何停止队列 Worker?

    A: 使用 Ctrl+C 或发送 SIGTERM 信号。

    Q: 队列 Worker 需要常驻运行吗?

    A: 是的,生产环境建议使用 Supervisor 或 systemd 管理 Worker 进程。

    Q: 如何测试队列任务?

    A: 在测试中可以使用同步队列或 Mock 队列。

    相关文档

  • 命令行工具 - 队列命令
  • 任务调度 - 定时任务
  • 配置管理 - 队列配置