Laravel Queue-消息队列任务处理器源码剖析

思考并回答以下问题:

运行队列处理器

队列处理器的设置

Laravel包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过queue:work命令来运行处理器。要注意,一旦queue:work命令开始,它将一直运行,直到你手动停止或者你关闭控制台:

1
php artisan queue:work

  • 可以指定队列处理器所使用的连接。
1
php artisan queue:work redis
  • 可以自定义队列处理器,方式是处理给定连接的特定队列。
1
php artisan queue:work redis --queue=emails
  • 可以使用--once选项来指定仅对队列中的单一任务进行处理:
1
php artisan queue:work --once
  • 如果一个任务失败了,会被放入延时队列中去,--delay选项可以设置失败任务的延时时间:
1
php artisan queue:work --delay=2
  • 如果想要限制一个任务的内存,可以使用--memory :
1
php artisan queue:work --memory=128
  • 当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。但是,如果没有新的工作可用,--sleep参数决定了工作进程将「睡眠」多长时间:
1
php artisan queue:work --sleep=3
  • 可以指定Laravel队列处理器最多执行多长时间后就应该被关闭掉:
1
php artisan queue:work --timeout=60
  • 可以指定Laravel队列处理器失败任务重试的次数:
1
php artisan queue:work --tries=60

可以看出来,队列处理器的设置大多数都可以由任务类进行设置,但是其中三个sleep、delay、memory只能由artisan来设置。

WorkCommand命令行启动

任务处理器进程的命令行模式会调用Illuminate\Queue\Console\WorkCommand,这个类在初始化的时候依赖注入了Illuminate\Queue\Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class WorkCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:work
{connection? : The name of the queue connection to work}
{--queue= : The names of the queues to work}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--stop-when-empty : Stop when the queue is empty}
{--delay=0 : The number of seconds to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : Number of times to attempt a job before logging it failed}';

/**
* Create a new queue work command.
*
* @param \Illuminate\Queue\Worker $worker
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
*/
public function __construct(Worker $worker, Cache $cache)
{
parent::__construct();

$this->cache = $cache;
$this->worker = $worker;
}

/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
if ($this->downForMaintenance() && $this->option('once'))
{
return $this->worker->sleep($this->option('sleep'));
}

// 我们将侦听已处理的事件和失败的事件,以便在处理作业时将信息写入控制台,
// 这将使开发人员可以观察到队列中正在处理哪些作业,并了解其进度。

$this->listenForEvents();

$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];

// 我们需要为应用程序的队列配置文件中设置的连接获取正确的队列。
// 我们将基于为当前正在执行的队列操作运行的设置连接来拉它。
$queue = $this->getQueue($connection);

$this->runWorker(
$connection, $queue
);
}
}

任务处理器启动后,会运行handle函数,在执行任务之前,程序首先会注册监听事件,主要监听任务完成与任务失败的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Listen for the queue events in order to update the console output.
*
* @return void
*/
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessing::class, function ($event) {
$this->writeOutput($event->job, 'starting');
});

$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, 'success');
});

$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, 'failed');

$this->logFailedJob($event);
});
}

/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $status
* @return void
*/
protected function writeOutput(Job $job, $status)
{
switch ($status)
{
case 'starting':
return $this->writeStatus($job, 'Processing', 'comment');
case 'success':
return $this->writeStatus($job, 'Processed', 'info');
case 'failed':
return $this->writeStatus($job, 'Failed', 'error');
}
}

/**
* Format the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $status
* @param string $type
* @return void
*/
protected function writeStatus(Job $job, $status, $type)
{
$this->output->writeln(sprintf(
"<{$type}>[%s][%s] %s</{$type}> %s",
Carbon::now()->format('Y-m-d H:i:s'),
$job->getJobId(),
str_pad("{$status}:", 11), $job->resolveName()
));
}

/**
* Store a failed job event.
*
* @param \Illuminate\Queue\Events\JobFailed $event
* @return void
*/
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}

启动任务管理器runWorker,该函数默认会调用Illuminate\Queue\Worker的daemon函数,只有在命令中强制--once参数的时候,才会执行runNestJob函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->cache);

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}

Worker任务调度

我们接下来接着看daemon函数:

Illuminate\Queue\Worker.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals())
{
$this->listenForSignals();
}

$lastRestart = $this->getTimestampOfLastQueueRestart();

while (true)
{
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue))
{
$this->pauseWorker($options, $lastRestart);

continue;
}

// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

if ($this->supportsAsyncSignals())
{
$this->registerTimeoutHandler($job, $options);
}

// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job)
{
$this->runJob($job, $connectionName, $options);
} else
{
$this->sleep($options->sleep);
}

if ($this->supportsAsyncSignals())
{
$this->resetTimeoutHandler();
}

// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
}
}

信号处理

listenForSignals函数用于PHP 7.1版本以上,用于脚本的信号处理。所谓的信号处理,就是由Process Monitor(如Supervisor)发送并与我们的脚本进行通信的异步通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected function listenForSignals()
{
if ($this->supportsAsyncSignals())
{
pcntl_async_signals(true);

pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});

pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});

pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
}

protected function supportsAsyncSignals()
{
return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl');
}

pcntl_async_signals()被调用来启用信号处理,然后我们为多个信号注册处理程序:

  • 当脚本被Supervisor指示关闭时,会引发信号SIGTERM。
  • SIGUSR2是用户定义的信号,Laravel用来表示脚本应该暂停。
  • 当暂停的脚本被Supervisor指示继续进行时,会引发SIGCONT。

在真正运行任务之前,程序还从cache中取了一次最后一次重启的时间:

1
2
3
4
5
6
7
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache)
{
return $this->cache->get('illuminate:queue:restart');
}
}

确定worker是否应该处理作业进入循环后,首先要判断当前脚本是应该处理任务,还是应该暂停,还是应该退出:

1
2
3
4
5
6
7
8
protected function daemonShouldRun(WorkerOptions $options)
{
return ! (
($this->manager->isDownForMaintenance() && ! $options->force)
|| $this->paused
|| $this->events->until(new Events\Looping) === false
);
}

以下几种情况,循环将不会处理任务:

  • 脚本处于维护模式并且没有--force选项。
  • 脚本被supervisor暂停。
  • 脚本的looping事件监听器返回false。

looping事件监听器在每次循环的时候都会被启动,如果返回false,那么当前的循环将会被暂停:pauseWorker:

1
2
3
4
5
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
}

脚本在sleep一段时间之后,就要重新判断当前脚本是否需要stop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit)
{
$this->kill();
}

if ($this->memoryExceeded($options->memory))
{
$this->stop(12);
}
elseif ($this->queueShouldRestart($lastRestart))
{
$this->stop();
}
}

protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}

protected function getTimestampOfLastQueueRestart()
{
if ($this->cache)
{
return $this->cache->get('illuminate:queue:restart');
}
}

以下情况脚本将会被stop:

  • 脚本被supervisor退出。
  • 内存超限。
  • 脚本被重启过。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public function kill($status = 0)
{
if (extension_loaded('posix'))
{
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}

public function stop($status = 0)
{
$this->events->fire(new Events\WorkerStopping);
exit($status);
}

脚本被重启,当前的进程需要退出并且重新加载。

获取下一个任务

当含有多个队列的时候,命令行可以用“,”连接多个队列的名字,位于前面的队列优先级更高:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue)
{
if (! is_null($job = $connection->pop($queue)))
{
return $job;
}
}
}
catch (Exception $e)
{
$this->exceptions->report($e);
}
catch (Throwable $e)
{
$this->exceptions->report(new FatalThrowableError($e));
}
}

$connection是具体的驱动,我们这里是Illuminate\Queue\RedisQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class RedisQueue extends Queue implements QueueContract
{
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));

if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}

[$job, $reserved] = $nextJob;

if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}

/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
}

在从队列中取出任务之前,需要先将delay队列和reserved队列中已经到时间的任务放到主队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);

if (! is_null($this->retryAfter))
{
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}

/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
);
}

由于从队列取出任务、在队列删除任务、压入主队列是三个操作,为了防止并发,程序这里使用了LUA脚本,保证三个操作的原子性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* KEYS[3] - The notification list for the queue we are moving jobs to, for example queues:foo:notify
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i+99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end

return val
LUA;
}

接下来,就要从主队列中获取下一个任务,在取出下一个任务之后,还要将任务放入reserved队列中,当任务执行失败后,该任务会进行重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @param bool $block
* @return array
*/
protected function retrieveNextJob($queue, $block = true)
{
$nextJob = $this->getConnection()->eval(
LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
$this->availableAt($this->retryAfter)
);

if (empty($nextJob)) {
return [null, null];
}

[$job, $reserved] = $nextJob;

if (! $job && ! is_null($this->blockFor) && $block &&
$this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
return $this->retrieveNextJob($queue, false);
}

return [$job, $reserved];
}

/**
* Get the Lua script for popping the next job off of the queue.
*
* KEYS[1] - The queue to pop jobs from, for example: queues:foo
* KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
* KEYS[3] - The notify queue
* ARGV[1] - The time at which the reserved job will expire
*
* @return string
*/
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false

if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
redis.call('lpop', KEYS[3])
end

return {job, reserved}
LUA;
}

从redis中获取到job之后,就会将其包装成RedisJob类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
{
$this->job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
$this->connectionName = $connectionName;
$this->decoded = $this->payload();
}

public function payload()
{
return json_decode($this->getRawBody(), true);
}

public function getRawBody()
{
return $this->job;
}

超时处理

如果一个脚本超时,pcntl_alarm将会启动并杀死当前的work进程。杀死进程后,work进程将会被守护进程重启,继续进行下一个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($options->timeout > 0 && $this->supportsAsyncSignals())
{
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});

pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
}
}

protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout())
? $job->timeout()
: $options->timeout;
}

任务事务

运行任务前后会启动两个事件JobProcessing与JobProcessed,这两个事件需要事先注册监听者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try
{
return $this->process($connectionName, $job, $options);
}
catch (Exception $e)
{
$this->exceptions->report($e);
}
catch (Throwable $e)
{
$this->exceptions->report(new FatalThrowableError($e));
}
}

public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);

$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName,
$job,
(int) $options->maxTries
);

$job->fire();

$this->raiseAfterJobEvent($connectionName, $job);
}
catch (Exception $e)
{
$this->handleJobException($connectionName, $job, $options, $e);
}
catch (Throwable $e)
{
$this->handleJobException(
$connectionName,
$job,
$options,
new FatalThrowableError($e)
);
}
}

任务前与任务后事件

raiseBeforeJobEvent函数用于触发任务处理前的事件,raiseAfterJobEvent函数用于触发任务处理后的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected function raiseBeforeJobEvent($connectionName, $job)
{
$this->events->fire(
new Events\JobProcessing(
$connectionName, $job
)
);
}

protected function raiseAfterJobEvent($connectionName, $job)
{
$this->events->fire(
new Events\JobProcessed(
$connectionName, $job
)
);
}

任务异常处理

任务在运行过程中会遇到异常情况,这个时候就要判断当前任务的失败次数是不是超过限制。如果没有超过限制,那么就会把当前任务重新放回队列当中;如果超过了限制,那么就要标记当前任务为失败任务,并且将任务从reserved队列中删除。

任务失败

markJobAsFailedIfAlreadyExceedsMaxAttempts函数用于任务运行前,判断当前任务是否重试次数超过限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries())
? $job->maxTries()
: $maxTries;

if ($maxTries === 0 || $job->attempts() <= $maxTries)
{
return;
}

$this->failJob(
$connectionName,
$job,
$e = new MaxAttemptsExceededException(
'A queued job has been attempted too many times. The job may have previously timed out.'
)
);

throw $e;
}

public function maxTries()
{
return array_get($this->payload(), 'maxTries');
}

public function attempts()
{
return Arr::get($this->decoded, 'attempts') + 1;
}

protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}

当遇到重试次数大于限制的任务,work进程就会调用FailingJob:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}

public static function handle($connectionName, $job, $e = null)
{
$job->markAsFailed();

if ($job->isDeleted())
{
return;
}

try
{
$job->delete();
$job->failed($e);
}
finally
{
static::events()->fire(
new JobFailed(
$connectionName, $job, $e ? : new ManuallyFailedException
)
);
}
}

public function markAsFailed()
{
$this->failed = true;
}

public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}

public function isDeleted()
{
return $this->deleted;
}

FailingJob会标记当前任务failed、deleted,并且会将当前任务移除reserved队列,不会再重试:

1
2
3
4
5
6
7
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem(
$this->getQueue($queue).':reserved',
$job->getReservedJob()
);
}

FailingJob还会调用RedisJob的failed函数,并且触发JobFailed事件:

1
2
3
4
5
6
7
8
9
10
11
12
public function failed($e)
{
$this->markAsFailed();
$payload = $this->payload();

list($class, $method) = JobName::parse($payload['job']);

if (method_exists($this->instance = $this->resolve($class),'failed'))
{
$this->instance->failed($payload['data'], $e);
}
}

程序会解析job类,我们先前在redis中已经存储了:

1
2
3
4
5
6
7
8
9
[
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];

我们接着看failed函数:

1
2
3
4
5
6
7
8
public function failed(array $data, $e)
{
$command = unserialize($data['command']);

if (method_exists($command, 'failed')) {
$command->failed($e);
}
}

可以看到,最后程序调用了任务类的failed函数。

异常处理

当任务遇到异常的时候,程序仍然会判断当前任务的重试次数,如果本次任务的重试次数已经大于或等于限制,那么就会停止重试,标记为失败;否则就会重新放入队列,记录日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
try {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
}
finally
{
if (! $job->isDeleted()) {
$job->release($options->delay);
}
}
throw $e;
}

protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries())
? $job->maxTries()
: $maxTries;

if ($maxTries > 0 && $job->attempts() >= $maxTries)
{
$this->failJob($connectionName, $job, $e);
}
}

public function release($delay = 0)
{
parent::release($delay);
$this->redis->deleteAndRelease($this->queue, $this, $delay);
}

public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);

$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job->getReservedJob(), $this->availableAt($delay)
);
}

一旦任务出现异常错误。那么该任务将会立刻从reserved队列放入delayed队列,并且抛出异常,抛出异常后,程序会将其记录在日志中。

1
2
3
4
5
6
7
8
9
10
public static function release()
{
return <<<'LUA'
-- Remove the job from the current queue...
redis.call('zrem', KEYS[2], ARGV[1])
-- Add the job onto the "delayed" queue...
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return true
LUA;
}

任务的运行

任务的运行首先会调用CallQueuedHandler的call函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public function fire()
{
$payload = $this->payload();

list($class, $method) = JobName::parse($payload['job']);

with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}

/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}

$this->dispatchThroughMiddleware($job, $command);

if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
}

if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}

setJobInstanceIfNecessary函数用于为任务类的trait:InteractsWithQueue的设置任务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Set the job instance of the given class if necessary.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $instance
* @return mixed
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
$instance->setJob($job);
}

return $instance;
}

trait InteractsWithQueue
{
/**
* Set the base queue job instance.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return $this
*/
public function setJob(JobContract $job)
{
$this->job = $job;

return $this;
}
}

接着任务的运行就要交给dispatch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Dispatch a command to its appropriate handler in the current process.
*
* @param mixed $command
* @param mixed $handler
* @return mixed
*/
public function dispatchNow($command, $handler = null)
{
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
return $handler->handle($command);
};
} else {
$callback = function ($command) {
return $this->container->call([$command, 'handle']);
};
}

return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}

/**
* Retrieve the handler for a command.
*
* @param mixed $command
* @return bool|mixed
*/
public function getCommandHandler($command)
{
if ($this->hasCommandHandler($command)) {
return $this->container->make($this->handlers[get_class($command)]);
}

return false;
}

/**
* Determine if the given command has a handler.
*
* @param mixed $command
* @return bool
*/
public function hasCommandHandler($command)
{
return array_key_exists(get_class($command), $this->handlers);
}

如果不对dispatcher类进行任何map函数设置,getCommandHandler将会返回null,此时就会调用任务类的handle函数,进行具体的业务逻辑。

任务结束后,就会调用delete函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();

$this->redis->deleteReserved($this->queue, $this);
}

/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}

这样,运行成功的任务会从reserved中删除。

0%