Laravel-Queue

思考并回答以下问题:

  • 队列很重要,经常有场景使用到。队列的目的是将耗时的任务延时处理,或是将大批量的工作放入队列进行异步处理。有什么应用场景?

简介

Laravel队列为不同的后台队列服务提供了统一的API,例如Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短Web请求和响应的时间。

应用场景

队列就是为了在某种程度上替代多线程而设计的一种处理并发的方式,同时,也就具备天生的秉性:异步!用于处理耗时的工作,比如一个流程走到一个地方,要发送一封邮件通知,那不能让整个程序等到邮件发完了再继续下去,那就把发邮件的工作丢到一个队列中去,让这个工作慢慢做,继续处理响应的逻辑。整个过程其实也就这么回事,没有很困难。

实现

我们要把一项一项的工作任务放到队列里,最朴素的想法,我们会怎么做?我们是不是先得给任务起个名字,然后至少得给它一段执行的代码吧,然后再把这一个任务作为一个整体的数据结构放到一个叫做队列的数据结构中去,那要把名字和执行的代码关联起来,肯定是用键值对的方式最方便了,所以,怎么做呢?最简单就是用数组喽,一个数组,然后中间一堆键值对,键名是任务名,键值是任务的执行代码。但是数组有一个问题,数组的检索是一个完了接一个,我没法控制它中间执行的异步操作啊?但是它能够模仿出这些操作模式,所以,在laravel中,就有了一个驱动名,叫sync,同步嘛,用来做测试用的。

配置

队列配置文件存放在config/queue.php。每一种队列驱动的配置都可以在该文件中找到,包括数据库、Beanstalkd、Amazon SQS、Redis以及同步(本地使用)驱动。其中还包含了一个null队列驱动用于那些放弃队列的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?php

return [
// Default Queue Connection Name
'default' => env('QUEUE_CONNECTION', 'sync'),


// Queue Connections
// Drivers: "sync", "database", "beanstalkd", "sqs", "redis", "null"
'connections' => [

'sync' => [
'driver' => 'sync',
],

'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],

'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],

],
// Failed Queue Jobs
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'database'),
'database' => env('DB_CONNECTION', 'mysql'),
'table' => 'failed_jobs',
],

];

连接Vs队列

在开始使用Laravel队列以前,了解“连接”和“队列”的关系非常重要。在配置文件config/queue.php有一个connections配置项。该配置项定义了后台队列服务的特定连接,如Redis。每种队列连接都可以有很多队列,可以想象在银行办理现金业务的各个窗口队列。

请注意queue配置文件中的每个连接配置示例都有一个queue属性。当新的队列任务被添加到指定的连接时,该配置项的值就是默认监听的队列(名称)。换种说法,如果你没有指派特别的队列名称,那么queue的值,也是该任务默认添加到的队列(名称):

1
2
3
4
5
// 以下的任务将被委派到默认队列...
dispatch(new Job);

// 以下任务将被委派到"emails"队列...
dispatch((new Job)->onQueue('emails'));

有些应用并不需要将任务分配到多个队列,单个队列已经非常适用。但是,应用的任务有优先级差异或者类别差异的时候,推送任务到多个队列将是更好地选择,因为Laravel的队列进程支持通过优先级指定处理的队列。举个例子,你可以将高优先级的任务委派到high(高优先级)队列,从而让它优先执行。

1
php artisan queue:work --queue=high,default

驱动预备知识

数据库

要使用database队列驱动,你需要数据表保存任务信息。要生成创建这些表的迁移,可以运行Artisan命令queue:table,迁移被创建之后,可以使用migrate命令生成这些表:

1
2
php artisan queue:table
php artisan migrate

Redis

要使用redis队列驱动,需要在配置文件config/database.php中配置Redis数据库连接。

Redis集群

如果Redis队列连接使用Redis Cluster(集群),队列名称必须包含key hash tag,以确保给定队列对应的所有Redis keys都存放到同一个hash slot:

1
2
3
4
5
6
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],

创建任务

生成任务类

通常,所有的任务类都保存在app/Jobs目录。如果app/Jobs不存在,在运行Artisan命令make:job的时候,它将会自动创建。你可以通过Artisan CLI来生成队列任务类:

1
php artisan make:job ProcessPodcast

生成的类都实现了Illuminate\Contracts\Queue\ShouldQueue接口,这是一个空接口,用于告知Laravel将该任务推送到队列,而不是立即运行。

任务类结构

任务类非常简单,通常只包含处理该任务的handle方法,让我们看一个任务类的例子。在这个例子中,我们模拟管理播客发布服务,并在发布以前上传相应的播客文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

protected $podcast;

/**
* 创建任务实例
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}

/**
* 执行任务
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客…
}
}

在本示例中,我们将Eloquent模型作为参数直接传递到构造函数。因为该任务使用了SerializesModels trait,Eloquent模型将会在任务被执行时优雅地序列化和反序列化。如果你的队列任务在构造函数中接收Eloquent模型,只有模型的主键会被序列化到队列,当任务真正被执行的时候,队列系统会自动从数据库中获取整个模型实例。这对应用而言是完全透明的,从而避免序列化整个Eloquent模型实例引起的问题。

handle方法在任务被处理的时候调用,注意我们可以在任务的handle方法中进行依赖注入,Laravel服务容器会自动注入这些依赖

如果你想要完全控制容器如何将依赖注入到handle方法,可以使用容器的bindMethod方法。bindMethod方法接收一个回调,该回调支持传入任务和容器实例,在这个回调中,你可以随意调用handle方法。通常,我们在某个服务提供者中调用这个方法:

1
2
3
4
5
use App\Jobs\ProcessPodcast;

$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});

注:二进制数据,如原生图片内容,在传递给队列任务之前先经过base64_encode方法处理,此外,该任务被推送到队列时将不会被序列化为JSON格式。

处理关联关系

由于加载关联关系也需要对相应模型实例进行序列化,从而导致序列化后的字符串非常庞大。要阻止对关联关系进行序列化,可以在设置属性值时调用模型实例上的withoutRelations来实现,该方法会返回一个不包含关联关系的模型实例:

1
2
3
4
5
6
7
8
9
10
/**
* Create a new job instance.
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}

任务中间件

任务中间件允许你在执行队列任务前封装一些自定义逻辑,从而减少任务本身的模板代码量。例如,考虑下面这个handle方法,它使用了Laravel的Redis限制频率功能,从而限定每五秒钟只能处理一个任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');

// Handle job...
}, function () {
// Could not obtain lock...

return $this->release(5);
});
}

尽管这段代码是有效的,但是由于混入了Redis频率限制逻辑,handle方法变得很臃肿,此外,这个频率限制逻辑在其它需要限制频率的任务中也要重复编写。

因此,在Laravel 6.0中引入了任务中间件,我们可以通过定义任务中间件来处理频率限制,而不是在队列任务处理方法中完成。Laravel没有默认的位置来存放任务中间件,因此,你可以将它们存放在任何地方。在本例中,我们将其存放在app/Jobs/Middleware目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
/**
* Process the queued job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...

$next($job);
}, function () use ($job) {
// Could not obtain lock...

$job->release(5);
});
}
}

正如你所看到的,和路由中间件一样,任务中间件接收待处理的任务以及一个回调作为参数,该回调会在中间件逻辑处理完毕后继续处理任务。

创建好任务中间件后,可以在任务类的middleware方法中将其添加到对应任务中,该方法在执行Artisan命令make:job时默认没有自动生成,所以你需要手动编写该方法实现代码:

1
2
3
4
5
6
7
8
9
10
11
use App\Jobs\Middleware\RateLimited;

/**
* Get the middlewarwe the job should pass through.
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}

分发任务

创建好任务类后,就可以通过任务自身的dispatch方法将其分发到队列。dispatch方法需要的唯一参数就是该任务的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast);
}
}

延时分发

有时候你可能想要延迟队列任务的执行,这可以通过在分发任务时使用delay方法实现。例如你希望将某个任务在创建10分钟以后才执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}

同步分发

如果你立即想要分发任务(同步),可以使用dispatchNow方法。使用这个方法时,对应任务不会被推送到队列,而是立即在当前进程中运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use App\Jobs\ProcessPodcast;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatchNow($podcast);
}
}

任务链

任务链允许你指定一个需要在一个序列中执行的队列任务列表,如果序列中的某个任务失败,其它任务将不再运行。要执行一个队列任务链,可以使用任意可分发任务上的withChain方法:

1
2
3
4
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();

注:使用$this->delete()方法删除任务不会阻断正在被处理的任务链中的任务。任务链中的任务只有在执行失败时才会停止执行。

链接连接&队列

如果你想要指定任务链使用的默认连接和队列,可以使用allOnConnection和allOnQueue方法。这些方法指定需要用到的队列连接和队列名称,除非队列任务显式指定了分配的连接/队列:

1
2
3
4
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义队列&连接

分发到指定的队列

通过推送任务到不同队列,你可以将队列任务进行“分类”,甚至根据优先级来分配每个队列的进程数。请注意,这并不意味着使用了配置项中那些不同的连接来管理队列,实际上只有单一连接会被用到。要指定队列,请在任务实例使用onQueue方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}

分发到指定的连接

如果你使用了多个连接来管理队列,那么可以分发任务到指定的连接。请在任务实例中使用onConnection方法来指定连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}

当然,你可以同时使用onConnection和onQueue方法来指定任务的连接和队列:

1
2
3
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');

此外,你还可以在任务类中通过connection属性指定连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The queue connection that should handle the job.
*
* @var string
*/
public $connection = 'sqs';
}

指定最大失败次数/超时时间

最大失败次数

指定队列任务最大失败次数的一种实现方式是通过Artisan命令—tries切换:

1
php artisan queue:work --tries=3

不过,你还可以在任务类自身定义最大失败次数来实现更加细粒度的控制,如果最大失败次数在任务中指定,则其优先级高于命令行指定的数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}

基于时间的尝试次数

除了定义在任务失败前的最大尝试次数外,还可以定义在指定时间内允许任务的最大尝试次数,这可以通过在任务类中添加retryUntil方法来实现:

1
2
3
4
5
6
7
8
9
/**
* Determine the time at which the job should timeout.
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}

注:还可以在队列时间监听器中定义retryUntil方法。

超时

注:timeout方法为PHP 7.1+和pcntl扩展做了优化。

类似的,队列任务最大运行时长(秒)可以通过Artisan命令上的—timeout开关来指定:

1
php artisan queue:work --timeout=30

同样,你也可以在任务类中定义该任务允许运行的最大时长(单位:秒),任务中指定的超时时间优先级也高于命令行定义的数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}

频率限制

注:该功能要求应用可以与Redis服务器进行交互。

如果应用使用了Redis,那么可以使用时间或并发来控制队列任务。该功能特性在队列任务与有频率限制的API交互时很有帮助。

例如,通过throttle(节流阀)方法,你可以限定给定类型任务每60秒只运行10次。如果不能获取锁,需要将任务释放回队列以便可以再次执行:

1
2
3
4
5
6
7
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...

return $this->release(10);
});

注:在上面的例子中,key可以是任意可以唯一标识你想要限定访问频率的任务类型的字符串。举个例子,这个键可以基于任务类名和操作Eloquent模型的ID进行构建。

注:将受限制的任务释放回队列依然会增加任务的总执行次数attempts的值。

除此之外,还可以指定可以同时处理给定任务的最大进程数量。这个功能在队列任务正在编辑一次只能由一个任务进行处理的资源时很有用。例如,使用funnel(漏斗)方法你可以给定类型任务一次只能由一个工作进程进行处理:

1
2
3
4
5
6
7
Redis::funnel('key')->limit(1)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...

return $this->release(10);
});

注:使用频率限制时,任务在运行成功之前需要的最大尝试次数很难权衡,因此,将频率限制和基于时间的尝试次数结合起来使用是个不错的选择。

处理错误

如果任务在处理的时候有异常抛出,则该任务将会被自动释放回队列以便再次尝试执行。任务会持续被释放直到尝试次数达到应用允许的最大次数。最大尝试次数通过Artisan命令queue:work 上的—tries开关来定义。此外,该次数也可以在任务类自身上定义。关于运行队列监听器的更多信息可以在下面看到。

队列闭包

除了将任务类推送到队列之外,还可以推送闭包到队列。这对于需要在当前请求生命周期之外执行的简单快捷的任务来说非常方便:

1
2
3
4
5
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
$podcast->publish();
});

推送闭包到队列时,闭包的代码内容以加密方式签名,所以不会在传输过程中被篡改。

运行队列进程

Laravel自带了一个队列进程用来处理被推送到队列的新任务。你可以使用queue:work命令运行这个队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端:

1
php artisan queue:work

注:为了保持队列进程queue:work持续在后台运行,需要使用进程守护程序,比如Supervisor来确保队列进程持续运行。

请记住,队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程。

此外,你还可以运行queue:listen命令。使用该命令时,代码修改后不需要手动重启队列进程,不过,该命令性能不及queue:work:

1
php artisan queue:listen

指定连接和队列

队列进程同样可以自定义连接和队列。传递给work命令的连接名需要与配置文件config/queue.php中定义的某个连接配置相匹配:

1
php artisan queue:work redis

你可以自定义将某个队列进程指定某个连接来管理。举例来说,如果所有的邮件任务都是通过 redis 连接上的emails队列处理,那么可以用以下命令来启动单一进程只处理单一队列:

1
php artisan queue:work redis --queue=emails

处理单个任务

--once选项可用于告知进程只处理队列中的单个任务:

1
php artisan queue:work --once

处理所有队列任务然后退出

—stop-when-empty选项可用于告知进程处理所有任务然后优雅退出。当我们在Docker容器中处理Laravel队列时,如果你想要在队列为空时关闭容器,则该选项很有用:

1
php artisan queue:work --stop-when-empty

资源注意事项

后台队列进程不会再处理每个任务前重启框架,因此你需要在每次任务完成后释放所有重量级的资源。例如,如果你在使用GD库处理图片,需要在完成的时候使用imagedestroy来释放内存。

队列优先级

有时候你需要区分任务的优先级。比如,在配置文件config/queue.php中,你可以定义redis连接的默认queue为low。不过,如果需要将任务分发到高优先级high,可以这么做:

1
dispatch((new Job)->onQueue('high'));

如果期望所有high高优先级的队列都将先于low低优先级的任务执行,可以像这样启动队列进程:

1
php artisan queue:work --queue=high,low

队列进程&部署

前文已经提到队列进程是长生命周期的进程,在重启以前,所有源码的修改并不会对其产生影响。所以,最简单的方法是在每次发布新版本后重新启动队列进程。你可以通过Aritisan命令queue:restart来优雅地重启队列进程:

1
php artisan queue:restart

该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行queue:restart命令后死掉,你仍然需要通过进程守护程序如Supervisor来自动重启队列进程。

注:队列使用缓存来存储重启信号,所以在使用此功能前你需要验证缓存驱动配置正确。

任务过期&超时

任务过期

在配置文件config/queue.php中,每个连接都定义了retry_after项。该配置项的目的是定义任务在执行以后多少秒后释放回队列。如果retry_after设定的值为90,任务在运行90秒后还未完成,那么将被释放回队列而不是删除掉。毫无疑问,你需要把retry_after的值设定为任务执行时间的最大可能值。

队列进程超时

队列进程 queue:work 可以设定超时—timeout项。该—timeout控制队列进程执行每个任务的最长时间,如果超时,该进程将被关闭。各种错误都可能导致某个任务处于“冻结”状态,比如HTTP无响应等。队列进程超时就是为了将这些“冻结”的进程关闭:

1
php artisan queue:work --timeout=60

配置项retry_after和Aritisan参数项—timeout不同,但目的都是为了确保任务的安全,并且只被成功的执行一次。

注:参数项—timeout的值应该始终小于配置项retry_after的值,这是为了确保队列进程总在任务重试以前关闭。如果—timeout比retry_after大,那么你的任务可能被执行两次。

进程休眠时间

当任务在队列中有效时,进程会持续处理任务,没有延迟。不过,我们可以使用sleep配置项来指定没有新的有效任务产生时的休眠时间。休眠期间,队列进程不会处理任何新任务直到队列进程醒来:

1
php artisan queue:work --sleep=3

配置Supervisor

安装Supervisor

Supervisor是Linux系统中常用的进程守护程序。如果队列进程queue:work意外关闭,它会自动重启启动队列进程。在Ubuntu安装Supervisor非常简单:

1
sudo apt-get install supervisor

配置Supervisor

Supervisor配置文件通常存放在/etc/supervisor/conf.d目录,在该目录下,可以创建多个配置文件指示Supervisor如何监视进程,例如,让我们创建一个开启并监视queue:work进程的laravel-worker.conf文件:

1
2
3
4
5
6
7
8
9
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在本例中,numprocs指令让Supervisor运行8个queue:work进程并监视它们,如果失败的话自动重启。当然,你需要修改queue:work sqs的command指令来映射你的队列连接。

启动Supervisor

当成功创建配置文件后,需要刷新Supervisor的配置信息并使用如下命令启动进程:

1
2
3
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*

你可以通过Supervisor官方文档获取更多信息。

处理失败的任务

不可避免会出现运行失败的任务。你不必为此担心,Laravel可以轻松设置任务允许的最大尝试次数,若是执行次数达到该限定,该任务会被插入到failed_jobs表,要创建一个failed_jobs 表的迁移,可以使用queue:failed-table命令:

1
2
php artisan queue:failed-table
php artisan migrate

然后,运行队列进程时,通过--tries参数项来设置队列任务允许的最大尝试次数,如果没有指定--tries选项的值,任务会被无限期重试:

1
php artisan queue:work redis --tries=3

此外,你还可以通过--delay选项指定重试失败任务之前需要等待多少秒,默认情况下,失败队列任务会立即重试:

1
php artisan queue:work redis --tries=3 --delay=3

如果你想要为指定任务配置失败重试延迟时间,可以在对应的队列任务类中定义retryAfter属性:

1
2
3
4
5
6
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $retryAfter = 3;

清理失败的任务

你可以在任务类中定义failed方法, 从而允许你在失败发生时执行指定的动作,比如发送任务失败的通知,记录日志等。导致任务失败的Exception会被传递到failed方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;

protected $podcast;

/**
* Create a new job instance.
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}

/**
* Execute the job.
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}

/**
* The job failed to process.
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 发送失败通知, etc...
}
}

任务失败事件

如果你期望在任务失败的时候触发某个事件,可以使用Queue::failing方法。该事件通过邮件或HipChat通知团队。举个例子,我们可以在Laravel自带的AppServiceProvider中添加一个回调到该事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
/**
* 启动应用服务.
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}

/**
* 注册服务提供者.
*
* @return void
*/
public function register()
{
//
}
}

重试失败的任务

要查看已插入到failed_jobs数据表中的所有失败任务,可以使用Artisan命令queue:failed:

1
php artisan queue:failed

该命令将会列出任务ID、连接、队列和失败时间,任务ID可用于重试失败任务,例如,要重试一个ID为5的失败任务,可以运行下面的命令:

1
php artisan queue:retry 5

要重试所有失败任务,运行如下命令即可:

1
php artisan queue:retry all

如果你要删除一个失败任务,可以使用queue:forget命令:

1
php artisan queue:forget 5

要删除所有失败任务,可以使用queue:flush命令:

1
php artisan queue:flush

忽略缺失的模型

当注入一个Eloquent模型到队列任务时,它会在被放到队列之前自动序列化,然后在任务被处理时恢复。不过,如果该模型实例在任务等待被处理期间被删除,对应任务在执行时会失败并抛出 ModelNotFoundException异常。

为了方便起见,你可以通过设置队列任务的deleteWhenMissingModels属性为true来选择自动删除缺失模型实例的任务:

1
2
3
4
5
6
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;

任务事件

通过Queue门面提供的before和after方法可以在任务被处理之前或之后指定要执行的回调。这些回调可用来记录日志或者记录统计数据。通常,你可以在服务提供者中使用这些方法。比如,我们可以在AppServiceProvider中这样用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}

/**
* Register the service provider.
*
* @return void
*/
public function register()
{
//
}
}

使用Queue门面上的looping方法,你可以在进程尝试从队列中获取任务之前指定要执行的回调。例如,你可以注册一个闭包来回滚之前失败任务遗留下来的事务:

1
2
3
4
5
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});

源码

Illuminate\Bus\Dispatcher.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
<?php

namespace Illuminate\Bus;

use Closure;
use Illuminate\Contracts\Bus\QueueingDispatcher;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\Queue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Pipeline\Pipeline;
use RuntimeException;

class Dispatcher implements QueueingDispatcher
{
/**
* The container implementation.
*
* @var \Illuminate\Contracts\Container\Container
*/
protected $container;

/**
* The pipeline instance for the bus.
*
* @var \Illuminate\Pipeline\Pipeline
*/
protected $pipeline;

/**
* The pipes to send commands through before dispatching.
*
* @var array
*/
protected $pipes = [];

/**
* The command to handler mapping for non-self-handling events.
*
* @var array
*/
protected $handlers = [];

/**
* The queue resolver callback.
*
* @var \Closure|null
*/
protected $queueResolver;

/**
* Create a new command dispatcher instance.
*
* @param \Illuminate\Contracts\Container\Container $container
* @param \Closure|null $queueResolver
* @return void
*/
public function __construct(Container $container, Closure $queueResolver = null)
{
$this->container = $container;
$this->queueResolver = $queueResolver;
$this->pipeline = new Pipeline($container);
}

/**
* Dispatch a command to its appropriate handler.
*
* @param mixed $command
* @return mixed
*/
public function dispatch($command)
{
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
return $this->dispatchToQueue($command);
}

return $this->dispatchNow($command);
}

/**
* Dispatch a command to its appropriate handler in the current process.
*
* @param mixed $command
* @param mixed $handler
* @return mixed
*/
public function dispatchNow($command, $handler = null)
{
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
return $handler->handle($command);
};
} else {
$callback = function ($command) {
return $this->container->call([$command, 'handle']);
};
}

return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}

/**
* Determine if the given command has a handler.
*
* @param mixed $command
* @return bool
*/
public function hasCommandHandler($command)
{
return array_key_exists(get_class($command), $this->handlers);
}

/**
* Retrieve the handler for a command.
*
* @param mixed $command
* @return bool|mixed
*/
public function getCommandHandler($command)
{
if ($this->hasCommandHandler($command)) {
return $this->container->make($this->handlers[get_class($command)]);
}

return false;
}

/**
* Determine if the given command should be queued.
*
* @param mixed $command
* @return bool
*/
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}

/**
* Dispatch a command to its appropriate handler behind a queue.
*
* @param mixed $command
* @return mixed
*
* @throws \RuntimeException
*/
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;

$queue = call_user_func($this->queueResolver, $connection);

if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}

if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}

return $this->pushCommandToQueue($queue, $command);
}

/**
* Push the command onto the given queue instance.
*
* @param \Illuminate\Contracts\Queue\Queue $queue
* @param mixed $command
* @return mixed
*/
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}

if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}

if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}

return $queue->push($command);
}

/**
* Set the pipes through which commands should be piped before dispatching.
*
* @param array $pipes
* @return $this
*/
public function pipeThrough(array $pipes)
{
$this->pipes = $pipes;

return $this;
}

/**
* Map a command to a handler.
*
* @param array $map
* @return $this
*/
public function map(array $map)
{
$this->handlers = array_merge($this->handlers, $map);

return $this;
}
}

Illuminate\Queue\Queue.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
<?php

namespace Illuminate\Queue;

use Closure;
use DateTimeInterface;
use Illuminate\Container\Container;
use Illuminate\Support\InteractsWithTime;
use Illuminate\Support\Str;

abstract class Queue
{
use InteractsWithTime;

/**
* The IoC container instance.
*
* @var \Illuminate\Container\Container
*/
protected $container;

/**
* The connection name for the queue.
*
* @var string
*/
protected $connectionName;

/**
* The create payload callbacks.
*
* @var callable[]
*/
protected static $createPayloadCallbacks = [];

/**
* Push a new job onto the queue.
*
* @param string $queue
* @param string $job
* @param mixed $data
* @return mixed
*/
public function pushOn($queue, $job, $data = '')
{
return $this->push($job, $data, $queue);
}

/**
* Push a new job onto the queue after a delay.
*
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @return mixed
*/
public function laterOn($queue, $delay, $job, $data = '')
{
return $this->later($delay, $job, $data, $queue);
}

/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string|null $queue
* @return void
*/
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
}

/**
* Create a payload string from the given job and data.
*
* @param \Closure|string|object $job
* @param string $queue
* @param mixed $data
* @return string
*
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function createPayload($job, $queue, $data = '')
{
if ($job instanceof Closure) {
$job = CallQueuedClosure::create($job);
}

$payload = json_encode($this->createPayloadArray($job, $queue, $data));

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}

return $payload;
}

/**
* Create a payload array from the given job and data.
*
* @param string|object $job
* @param string $queue
* @param mixed $data
* @return array
*/
protected function createPayloadArray($job, $queue, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job, $queue)
: $this->createStringPayload($job, $queue, $data);
}

/**
* Create a payload for an object-based queue handler.
*
* @param object $job
* @param string $queue
* @return array
*/
protected function createObjectPayload($job, $queue)
{
$payload = $this->withCreatePayloadHooks($queue, [
'uuid' => (string) Str::uuid(),
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'maxExceptions' => $job->maxExceptions ?? null,
'delay' => $this->getJobRetryDelay($job),
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => $job,
'command' => $job,
],
]);

return array_merge($payload, [
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
}

/**
* Get the display name for the given job.
*
* @param object $job
* @return string
*/
protected function getDisplayName($job)
{
return method_exists($job, 'displayName')
? $job->displayName() : get_class($job);
}

/**
* Get the retry delay for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobRetryDelay($job)
{
if (! method_exists($job, 'retryAfter') && ! isset($job->retryAfter)) {
return;
}

$delay = $job->retryAfter ?? $job->retryAfter();

return $delay instanceof DateTimeInterface
? $this->secondsUntil($delay) : $delay;
}

/**
* Get the expiration timestamp for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobExpiration($job)
{
if (! method_exists($job, 'retryUntil') && ! isset($job->timeoutAt)) {
return;
}

$expiration = $job->timeoutAt ?? $job->retryUntil();

return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp() : $expiration;
}

/**
* Create a typical, string based queue payload array.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return array
*/
protected function createStringPayload($job, $queue, $data)
{
return $this->withCreatePayloadHooks($queue, [
'uuid' => (string) Str::uuid(),
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
'job' => $job,
'maxTries' => null,
'maxExceptions' => null,
'delay' => null,
'timeout' => null,
'data' => $data,
]);
}

/**
* Register a callback to be executed when creating job payloads.
*
* @param callable $callback
* @return void
*/
public static function createPayloadUsing($callback)
{
if (is_null($callback)) {
static::$createPayloadCallbacks = [];
} else {
static::$createPayloadCallbacks[] = $callback;
}
}

/**
* Create the given payload using any registered payload hooks.
*
* @param string $queue
* @param array $payload
* @return array
*/
protected function withCreatePayloadHooks($queue, array $payload)
{
if (! empty(static::$createPayloadCallbacks)) {
foreach (static::$createPayloadCallbacks as $callback) {
$payload = array_merge($payload, call_user_func(
$callback, $this->getConnectionName(), $queue, $payload
));
}
}

return $payload;
}

/**
* Get the connection name for the queue.
*
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}

/**
* Set the connection name for the queue.
*
* @param string $name
* @return $this
*/
public function setConnectionName($name)
{
$this->connectionName = $name;

return $this;
}

/**
* Set the IoC container instance.
*
* @param \Illuminate\Container\Container $container
* @return void
*/
public function setContainer(Container $container)
{
$this->container = $container;
}
}
0%