Laravel Queue-消息队列任务与分发源码剖析

思考并回答以下问题:

  • Laravel的队列服务由两个进程控制,一个是生产者,一个是消费者。怎么理解?

前言

在实际的项目开发中,我们经常会遇到需要轻量级队列的情形,例如发短信、发邮件等,这些任务不足以使用Kafka、RabbitMQ等重量级的消息队列,但是又的确需要异步、重试、并发控制等功能。通常来说,我们经常会使用Redis、Beanstalkd、Amazon SQS来实现相关功能,Laravel为此对不同的后台队列服务提供统一的API,本文将会介绍应用最为广泛的Redis队列。

背景知识

在讲解Laravel的队列服务之前,我们要先说说基于Redis的队列服务。首先,Redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列。

Redis队列的数据结构

List链表

Redis做消息队列的特性例如FIFO(先入先出)很容易实现,只需要一个List对象从头取数据,从尾部塞数据即可。

相关的命令:

  • (1)左侧入右侧出:lpush/rpop;
  • (2)右侧入左侧出:rpush/lpop。

这个简单的消息队列很容易实现。

ZSet有序集合

有些任务场景,并不需要任务立刻执行,而是需要延迟执行;有些任务很重要,需要在任务失败的时候重新尝试。这些功能仅仅依靠List是无法完成的。这个时候,就需要Redis的有序集合。

Redis有序集合和Redis集合类似,是不包含相同字符串的合集。它们的差别是,每个有序集合的成员都关联着一个评分score,这个评分用于把有序集合中的成员按最低分到最高分排列。

单看有序集合和延迟任务并无关系,但是可以将有序集合的评分score设置为延时任务开启的时间,之后轮询这个有序集合,将到期的任务拿出来进行处理,这样就实现了延迟任务的功能。

对于重要的需要重试的任务,在任务执行之前,会将该任务放入有序集合中,设置任务最长的执行时间。若任务顺利执行完毕,该任务会在有序集合中删除。如果任务没有在规定时间内完成,那么该有序集合的任务将会被重新放入队列中。

相关命令:

  • (1) ZADD 添加一个或多个成员到有序集合,或者如果它已经存在更新其分数。
  • (2) ZRANGEBYSCORE 按分数返回一个成员范围的有序集合。
  • (3) ZREMRANGEBYRANK 在给定的索引之内删除所有成员的有序集合。

Laravel队列服务的任务调度

队列服务的任务调度过程如下:

Laravel的队列服务由两个进程控制,一个是生产者,一个是消费者。这两个进程操纵了Redis三个队列,其中一个List,负责即时任务,两个ZSet,负责延时任务与待处理任务。

生产者负责向Redis推送任务,如果是即时任务,默认就会向queue:default推送;如果是延时任务,就会向queue:default:delayed推送。

消费者轮询两个队列,不断的从队列中取出任务,先把任务放入queue:default:reserved中,再执行相关任务。如果任务执行成功,就会删除queue:default:reserved中的任务,否则会被重新放入queue:default:delayed队列中。

Laravel队列服务的总体流程

任务分发流程:

任务处理器运作:

Laravel队列服务的注册与启动

Laravel队列服务需要注册的服务比较多:

Illuminate/Queue/QueueServiceProvider.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php

namespace Illuminate\Queue;

use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
use Illuminate\Support\Arr;
use Illuminate\Support\ServiceProvider;
use Illuminate\Support\Str;
use Opis\Closure\SerializableClosure;

class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->registerManager();
$this->registerConnection();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
$this->registerOpisSecurityKey();
}

}

registerManager注册门面

registerManager负责注册队列服务的门面类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Register the queue manager.
*
* @return void
*/
protected function registerManager()
{
$this->app->singleton('queue', function ($app) {
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
return tap(new QueueManager($app), function ($manager) {
$this->registerConnectors($manager);
});
});
}

/**
* Register the connectors on the queue manager.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}

/**
* Register the Redis queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerRedisConnector($manager)
{
$manager->addConnector('redis', function () {
return new RedisConnector($this->app['redis']);
});
}

QueueManager是队列服务的总门面,提供一切与队列相关的操作接口。QueueManager中有一个成员变量$connectors,该成员变量中存储着所有Laravel支持的底层队列服务:’Database’,’Redis’,’Beanstalkd’,’SQS’。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class QueueManager implements FactoryContract, MonitorContract
{
/**
* The array of resolved queue connectors.
*
* @var array
*/
protected $connectors = [];

/**
* Add a queue connection resolver.
*
* @param string $driver
* @param \Closure $resolver
* @return void
*/
public function addConnector($driver, Closure $resolver)
{
$this->connectors[$driver] = $resolver;
}
}

成员变量$connectors会被存储各种驱动的connector,例如RedisConnector、SqsConnector、DatabaseConnector、BeanstalkdConnector。

registerConnection底层队列连接服务

接下来,就要连接实现队列的底层服务了,例如Redis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Register the default queue connection binding.
*
* @return void
*/
protected function registerConnection()
{
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}

/**
* Resolve a queue connection instance.
*
* @param string|null $name
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connection($name = null)
{
$name = $name ?: $this->getDefaultDriver();

// If the connection has not been resolved yet we will resolve it now as all
// of the connections are resolved when they are actually needed so we do
// not make any unnecessary connection to the various queue end-points.
if (! isset($this->connections[$name])) {
$this->connections[$name] = $this->resolve($name);

$this->connections[$name]->setContainer($this->app);
}

return $this->connections[$name];
}

/**
* Get the name of the default queue connection.
*
* @return string
*/
public function getDefaultDriver()
{
return $this->app['config']['queue.default'];
}

connection函数首先会获取连接名,没有连接名就会从config中获取默认的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Resolve a queue connection.
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*/
protected function resolve($name)
{
$config = $this->getConfig($name);

return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
}

resolve函数利用相应的底层驱动connector进行连接操作,也就是connect函数,该函数会返回RedisQueue:

Illuminate/Queue/Connectors/RedisConnector.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php

namespace Illuminate\Queue\Connectors;

use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Queue\RedisQueue;

class RedisConnector implements ConnectorInterface
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Contracts\Queue\Queue
*/
public function connect(array $config)
{
return new RedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
);
}
}

registerWorker消费者服务注册

消费者的注册服务会返回Illuminate\Queue\Worker类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->app->singleton('queue.worker', function ($app) {
$isDownForMaintenance = function () {
return $this->app->isDownForMaintenance();
};

return new Worker(
$app['queue'],
$app['events'],
$app[ExceptionHandler::class],
$isDownForMaintenance
);
});
}

Laravel Bus服务注册与启动

定义好自己想要的队列类之后,还需要将队列任务推送给底层驱动后台,例如Redis,一般会使用dispatch函数:

1
Job::dispatch();

或者

1
2
$job = (new ProcessPodcast($pocast));
dispatch($job);

dispatch函数就是Bus服务,专门用于分发队列任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?php

namespace Illuminate\Bus;

use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract;
use Illuminate\Contracts\Bus\QueueingDispatcher as QueueingDispatcherContract;
use Illuminate\Contracts\Queue\Factory as QueueFactoryContract;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Support\ServiceProvider;

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
});

$this->app->alias(
Dispatcher::class, DispatcherContract::class
);

$this->app->alias(
Dispatcher::class, QueueingDispatcherContract::class
);
}

/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
Dispatcher::class,
DispatcherContract::class,
QueueingDispatcherContract::class,
];
}
}

创建任务

queue设置

1
2
3
4
5
6
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
],

一般来说,默认的redis配置如上,connection是database中redis的连接名称;queue是redis中的队列名称,值得注意的是,如果使用的是redis集群的话,这个需要使用key hash tag,也就是{default};当任务运行超过retry_after这个时间后,该任务会被重新放入队列当中。

任务类的创建

  • 任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的handle方法。
  • 如果想要使得任务被推送到队列中,而不是同步执行,那么需要实现Illuminate\Contracts\Queue\ShouldQueue接口。
  • 如果想要让任务推送到特定的连接中,例如redis或者sqs,那么需要设置conneciton变量。
  • 如果想要让任务推送到特定的队列中去,可以设置queue变量。
  • 如果想要让任务延迟推送,那么需要设置delay变量。
  • 如果想要设置任务至多重试的次数,可以使用tries变量;
  • 如果想要设置任务可以运行的最大秒数,那么可以使用timeout参数。
  • 如果想要手动访问队列,可以使用trait:Illuminate\Queue\InteractsWithQueue。

如果队列监听器任务执行次数超过在工作队列中定义的最大尝试次数,监听器的failed方法将会被自动调用。failed方法接受事件实例和失败的异常作为参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

protected $podcast;
public $connection = 'redis';
public $queue = 'test';
public $delay = 30;
public $tries = 5;
public $timeout = 30;

public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}

public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
if (false) {
$this->release(30);
}
}

public function failed(OrderShipped $event, $exception)
{
//
}
}

任务事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AppServiceProvider extends ServiceProvider
{
public function boot()
{
// 任务运行前
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

//任务运行后
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

//任务循环前
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});

// 任务失败后
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});

// 异常发生后
Queue::exceptionOccurred(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}

任务的分发

分发服务

写好任务类后,就能通过dispatch辅助函数来分发它了。唯一需要传递给dispatch的参数是这个任务类的实例:

1
2
3
4
5
6
7
8
class PodcastController extends Controller
{
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast);
}
}

如果想延迟执行一个队列中的任务,可以用任务实例的delay方法。

1
ProcessPodcast::dispatch($podcast)->delay(Carbon::now()->addMinutes(10));

通过推送任务到不同的队列,可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。要指定队列的话,就调用任务实例的onQueue方法:

1
ProcessPodcast::dispatch($podcast)->onQueue('processing');

如果使用了多个队列连接,可以将任务推到指定连接。要指定连接的话,可以在分发任务的时候使用onConnection方法:

1
ProcessPodcast::dispatch($podcast)->onConnection('redis');

这些链式的函数是在trait:Illuminate\Foundation\Bus\Dispatchable的基础上应用的,该trait由dispatch函数启动:

1
2
3
4
5
6
7
trait Dispatchable
{
public static function dispatch()
{
return new PendingDispatch(new static(...func_get_args()));
}
}

PendingDispatch类中定义了链式函数,该函数巧妙在析构函数中,析构函数自动调用全局函数dispatch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PendingDispatch
{
public function __construct($job)
{
$this->job = $job;
}

public function onConnection($connection)
{
$this->job->onConnection($connection);
return $this;
}

public function onQueue($queue)
{
$this->job->onQueue($queue);
return $this;
}

public function delay($delay)
{
$this->job->delay($delay);
return $this;
}

public function __destruct()
{
dispatch($this->job);
}
}

各个函数里面的onConnection、delay、onQueue等函数是任务中的trait:Illuminate\Bus\Queueable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
trait Queueable
{
public function onConnection($connection)
{
$this->connection = $connection;
return $this;
}

public function onQueue($queue)
{
$this->queue = $queue;
return $this;
}

public function delay($delay)
{
$this->delay = $delay;
return $this;
}
}

dispatch任务分发源码

任务的分发离不开Bus服务,可以利用全局函数dispatch,还可以使用Dispatchable这个trait:

Illuminate/Bus/Dispatcher.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Dispatcher implements QueueingDispatcher
{
/**
* The queue resolver callback.
*
* @var \Closure|null
*/
protected $queueResolver;

/**
* Dispatch a command to its appropriate handler.
*
* @param mixed $command
* @return mixed
*/
public function dispatch($command)
{
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
return $this->dispatchToQueue($command);
}

return $this->dispatchNow($command);
}

/**
* Determine if the given command should be queued.
*
* @param mixed $command
* @return bool
*/
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}
}

我们这里主要看异步的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Dispatch a command to its appropriate handler behind a queue.
*
* @param mixed $command
* @return mixed
*/
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;

$queue = call_user_func($this->queueResolver, $connection);

if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}

if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}

return $this->pushCommandToQueue($queue, $command);
}

进行任务分发之前,首先要利用queueResolver连接底层驱动。如果任务类中含有queue函数,那么就会利用用户自己的queue对驱动进行推送任务。否则就会启动默认的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Push the command onto the given queue instance.
*
* @param \Illuminate\Contracts\Queue\Queue $queue
* @param mixed $command
* @return mixed
*/
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}

if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}

if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}

return $queue->push($command);
}

我们以redis为例,queue这个类就是Illuminate\Queue\RedisQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RedisQueue extends Queue implements QueueContract
{

/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

/**
* Push a new job onto the queue after a delay.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param object|string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
}

我们先看push,push函数调用pushRaw,在调用之前,要把任务类进行序列化,并且以特定的格式进行json序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return array
*/
protected function createPayloadArray($job, $queue, $data = '')
{
return array_merge(parent::createPayloadArray($job, $queue, $data), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
}

protected function createPayloadArray($job, $data = '', $queue = null)
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createStringPayload($job, $data);
}

protected function createObjectPayload($job)
{
return [
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
}

protected function createStringPayload($job, $data)
{
return ['job' => $job, 'data' => $data];
}

格式化数据之后,就会将json推送到redis队列中,对于非延时的任务,直接调用rpush即可:

1
2
3
4
5
6
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);

return Arr::get(json_decode($payload, true), 'id');
}

对于延时的任务,会调用laterRaw,调用redis的有序集合zadd函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected function availableAt($delay = 0)
{
return $delay instanceof DateTimeInterface
? $delay->getTimestamp()
: Carbon::now()->addSeconds($delay)->getTimestamp();
}

protected function laterRaw($delay, $payload, $queue = null)
{
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed',
$this->availableAt($delay),
$payload
);

return Arr::get(json_decode($payload, true), 'id');
}

这样,相关任务就会被分发到redis对应的队列中去。

0%