思考并回答以下问题:
运行队列处理器
队列处理器的设置
Laravel包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过queue:work命令来运行处理器。要注意,一旦queue:work命令开始,它将一直运行,直到你手动停止或者你关闭控制台:1
php artisan queue:work
- 可以指定队列处理器所使用的连接。
| 1 | php artisan queue:work redis | 
- 可以自定义队列处理器,方式是处理给定连接的特定队列。
| 1 | php artisan queue:work redis --queue=emails | 
- 可以使用--once选项来指定仅对队列中的单一任务进行处理:
| 1 | php artisan queue:work --once | 
- 如果一个任务失败了,会被放入延时队列中去,--delay选项可以设置失败任务的延时时间:
| 1 | php artisan queue:work --delay=2 | 
- 如果想要限制一个任务的内存,可以使用--memory :
| 1 | php artisan queue:work --memory=128 | 
- 当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。但是,如果没有新的工作可用,--sleep参数决定了工作进程将「睡眠」多长时间:
| 1 | php artisan queue:work --sleep=3 | 
- 可以指定Laravel队列处理器最多执行多长时间后就应该被关闭掉:
| 1 | php artisan queue:work --timeout=60 | 
- 可以指定Laravel队列处理器失败任务重试的次数:
| 1 | php artisan queue:work --tries=60 | 
可以看出来,队列处理器的设置大多数都可以由任务类进行设置,但是其中三个sleep、delay、memory只能由artisan来设置。
WorkCommand命令行启动
任务处理器进程的命令行模式会调用Illuminate\Queue\Console\WorkCommand,这个类在初始化的时候依赖注入了Illuminate\Queue\Worker:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64class WorkCommand extends Command
{
    /**
     * The console command name.
     *
     * @var string
     */
    protected $signature = 'queue:work
                            {connection? : The name of the queue connection to work}
                            {--queue= : The names of the queues to work}
                            {--daemon : Run the worker in daemon mode (Deprecated)}
                            {--once : Only process the next job on the queue}
                            {--stop-when-empty : Stop when the queue is empty}
                            {--delay=0 : The number of seconds to delay failed jobs}
                            {--force : Force the worker to run even in maintenance mode}
                            {--memory=128 : The memory limit in megabytes}
                            {--sleep=3 : Number of seconds to sleep when no job is available}
                            {--timeout=60 : The number of seconds a child process can run}
                            {--tries=1 : Number of times to attempt a job before logging it failed}';
    /**
     * Create a new queue work command.
     *
     * @param  \Illuminate\Queue\Worker  $worker
     * @param  \Illuminate\Contracts\Cache\Repository  $cache
     * @return void
     */
    public function __construct(Worker $worker, Cache $cache)
    {
        parent::__construct();
        $this->cache = $cache;
        $this->worker = $worker;
    }
    /**
     * Execute the console command.
     *
     * @return void
     */
    public function handle()
    {
        if ($this->downForMaintenance() && $this->option('once')) 
        {
            return $this->worker->sleep($this->option('sleep'));
        }
        // 我们将侦听已处理的事件和失败的事件,以便在处理作业时将信息写入控制台,
        // 这将使开发人员可以观察到队列中正在处理哪些作业,并了解其进度。
        $this->listenForEvents();
        $connection = $this->argument('connection')
                        ?: $this->laravel['config']['queue.default'];
        // 我们需要为应用程序的队列配置文件中设置的连接获取正确的队列。 
        // 我们将基于为当前正在执行的队列操作运行的设置连接来拉它。
        $queue = $this->getQueue($connection);
        $this->runWorker(
            $connection, $queue
        );
    }
}
任务处理器启动后,会运行handle函数,在执行任务之前,程序首先会注册监听事件,主要监听任务完成与任务失败的情况:
| 1 | /** | 
启动任务管理器runWorker,该函数默认会调用Illuminate\Queue\Worker的daemon函数,只有在命令中强制--once参数的时候,才会执行runNestJob函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
 * Run the worker instance.
 *
 * @param  string  $connection
 * @param  string  $queue
 * @return array
 */
protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->cache);
    return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}
Worker任务调度
我们接下来接着看daemon函数:
Illuminate\Queue\Worker.php
| 1 | /** | 
信号处理
listenForSignals函数用于PHP 7.1版本以上,用于脚本的信号处理。所谓的信号处理,就是由Process Monitor(如Supervisor)发送并与我们的脚本进行通信的异步通知。
| 1 | protected function listenForSignals() | 
pcntl_async_signals()被调用来启用信号处理,然后我们为多个信号注册处理程序:
- 当脚本被Supervisor指示关闭时,会引发信号SIGTERM。
- SIGUSR2是用户定义的信号,Laravel用来表示脚本应该暂停。
- 当暂停的脚本被Supervisor指示继续进行时,会引发SIGCONT。
在真正运行任务之前,程序还从cache中取了一次最后一次重启的时间:1
2
3
4
5
6
7protected function getTimestampOfLastQueueRestart()
{
    if ($this->cache) 
    {
        return $this->cache->get('illuminate:queue:restart');
    }
}
确定worker是否应该处理作业进入循环后,首先要判断当前脚本是应该处理任务,还是应该暂停,还是应该退出:
| 1 | protected function daemonShouldRun(WorkerOptions $options) | 
以下几种情况,循环将不会处理任务:
- 脚本处于维护模式并且没有--force选项。
- 脚本被supervisor暂停。
- 脚本的looping事件监听器返回false。
looping事件监听器在每次循环的时候都会被启动,如果返回false,那么当前的循环将会被暂停:pauseWorker:1
2
3
4
5protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);
    $this->stopIfNecessary($options, $lastRestart);
}
脚本在sleep一段时间之后,就要重新判断当前脚本是否需要stop:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
    if ($this->shouldQuit) 
    {
        $this->kill();
    }
    if ($this->memoryExceeded($options->memory)) 
    {
        $this->stop(12);
    } 
    elseif ($this->queueShouldRestart($lastRestart)) 
    {
        $this->stop();
    }
}
protected function queueShouldRestart($lastRestart)
{
    return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
protected function getTimestampOfLastQueueRestart()
{
    if ($this->cache) 
    {
        return $this->cache->get('illuminate:queue:restart');
    }
}
以下情况脚本将会被stop:
- 脚本被supervisor退出。
- 内存超限。
- 脚本被重启过。
| 1 | public function kill($status = 0) | 
脚本被重启,当前的进程需要退出并且重新加载。
获取下一个任务
当含有多个队列的时候,命令行可以用“,”连接多个队列的名字,位于前面的队列优先级更高:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(',', $queue) as $queue) 
        {
            if (! is_null($job = $connection->pop($queue))) 
            {
                return $job;
            }
        }
    } 
    catch (Exception $e) 
    {
        $this->exceptions->report($e);
    } 
    catch (Throwable $e) 
    {
        $this->exceptions->report(new FatalThrowableError($e));
    }
}
$connection是具体的驱动,我们这里是Illuminate\Queue\RedisQueue:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37class RedisQueue extends Queue implements QueueContract
{
    /**
     * Pop the next job off of the queue.
     *
     * @param  string|null  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     */
    public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));
        if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
            return;
        }
        [$job, $reserved] = $nextJob;
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }
    /**
     * Get the queue or return the default.
     *
     * @param  string|null  $queue
     * @return string
     */
    public function getQueue($queue)
    {
        return 'queues:'.($queue ?: $this->default);
    }
}
在从队列中取出任务之前,需要先将delay队列和reserved队列中已经到时间的任务放到主队列中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
 * Migrate any delayed or expired jobs onto the primary queue.
 *
 * @param  string  $queue
 * @return void
 */
protected function migrate($queue)
{
    $this->migrateExpiredJobs($queue.':delayed', $queue);
    if (! is_null($this->retryAfter)) 
    {
        $this->migrateExpiredJobs($queue.':reserved', $queue);
    }
}
/**
 * Migrate the delayed jobs that are ready to the regular queue.
 *
 * @param  string  $from
 * @param  string  $to
 * @return array
 */
public function migrateExpiredJobs($from, $to)
{
    return $this->getConnection()->eval(
        LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
    );
}
由于从队列取出任务、在队列删除任务、压入主队列是三个操作,为了防止并发,程序这里使用了LUA脚本,保证三个操作的原子性:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34/**
 * Get the Lua script to migrate expired jobs back onto the queue.
 *
 * KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
 * KEYS[2] - The queue we are moving jobs to, for example: queues:foo
 * KEYS[3] - The notification list for the queue we are moving jobs to, for example queues:foo:notify
 * ARGV[1] - The current UNIX timestamp
 *
 * @return string
 */
public static function migrateExpiredJobs()
{
    return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
        -- Push a notification for every job that was migrated...
        for j = i, math.min(i+99, #val) do
            redis.call('rpush', KEYS[3], 1)
        end
    end
end
return val
LUA;
}
接下来,就要从主队列中获取下一个任务,在取出下一个任务之后,还要将任务放入reserved队列中,当任务执行失败后,该任务会进行重试。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57/**
 * Retrieve the next job from the queue.
 *
 * @param  string  $queue
 * @param  bool  $block
 * @return array
 */
protected function retrieveNextJob($queue, $block = true)
{
    $nextJob = $this->getConnection()->eval(
        LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
        $this->availableAt($this->retryAfter)
    );
    if (empty($nextJob)) {
        return [null, null];
    }
    [$job, $reserved] = $nextJob;
    if (! $job && ! is_null($this->blockFor) && $block &&
        $this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
        return $this->retrieveNextJob($queue, false);
    }
    return [$job, $reserved];
}
/**
 * Get the Lua script for popping the next job off of the queue.
 *
 * KEYS[1] - The queue to pop jobs from, for example: queues:foo
 * KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
 * KEYS[3] - The notify queue
 * ARGV[1] - The time at which the reserved job will expire
 *
 * @return string
 */
public static function pop()
{
    return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    redis.call('lpop', KEYS[3])
end
return {job, reserved}
LUA;
}
从redis中获取到job之后,就会将其包装成RedisJob类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
{
    $this->job            = $job;
    $this->redis          = $redis;
    $this->queue          = $queue;
    $this->reserved       = $reserved;
    $this->container      = $container;
    $this->connectionName = $connectionName;
    $this->decoded        = $this->payload();
}
public function payload()
{
    return json_decode($this->getRawBody(), true);
}
public function getRawBody()
{
    return $this->job;
}
超时处理
如果一个脚本超时,pcntl_alarm将会启动并杀死当前的work进程。杀死进程后,work进程将会被守护进程重启,继续进行下一个任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    if ($options->timeout > 0 && $this->supportsAsyncSignals())
    {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });
        pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
    }
}
protected function timeoutForJob($job, WorkerOptions $options)
{
    return $job && ! is_null($job->timeout()) 
        ? $job->timeout()
        : $options->timeout;
}
任务事务
运行任务前后会启动两个事件JobProcessing与JobProcessed,这两个事件需要事先注册监听者:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45protected function runJob($job, $connectionName, WorkerOptions $options)
{
    try 
    {
        return $this->process($connectionName, $job, $options);
    } 
    catch (Exception $e) 
    {
        $this->exceptions->report($e);
    } 
    catch (Throwable $e) 
    {
        $this->exceptions->report(new FatalThrowableError($e));
    }
}
public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);
        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, 
            $job, 
            (int) $options->maxTries
        );
        $job->fire();
        $this->raiseAfterJobEvent($connectionName, $job);
    } 
    catch (Exception $e) 
    {
        $this->handleJobException($connectionName, $job, $options, $e);
    } 
    catch (Throwable $e) 
    {
        $this->handleJobException(
            $connectionName, 
            $job, 
            $options, 
            new FatalThrowableError($e)
        );
    }
}
任务前与任务后事件
raiseBeforeJobEvent函数用于触发任务处理前的事件,raiseAfterJobEvent函数用于触发任务处理后的事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17protected function raiseBeforeJobEvent($connectionName, $job)
{
    $this->events->fire(
        new Events\JobProcessing(
            $connectionName, $job
        )
    );
}
protected function raiseAfterJobEvent($connectionName, $job)
{
    $this->events->fire(
        new Events\JobProcessed(
            $connectionName, $job
        )
    );
}
任务异常处理
任务在运行过程中会遇到异常情况,这个时候就要判断当前任务的失败次数是不是超过限制。如果没有超过限制,那么就会把当前任务重新放回队列当中;如果超过了限制,那么就要标记当前任务为失败任务,并且将任务从reserved队列中删除。
任务失败
markJobAsFailedIfAlreadyExceedsMaxAttempts函数用于任务运行前,判断当前任务是否重试次数超过限制:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) 
                ? $job->maxTries() 
                : $maxTries;
    if ($maxTries === 0 || $job->attempts() <= $maxTries) 
    {
        return;
    }
    $this->failJob(
        $connectionName, 
        $job, 
        $e = new MaxAttemptsExceededException(
            'A queued job has been attempted too many times. The job may have previously timed out.'
        )
    );
    throw $e;
}
public function maxTries()
{
    return array_get($this->payload(), 'maxTries');
}
public function attempts()
{
    return Arr::get($this->decoded, 'attempts') + 1;
}
protected function failJob($connectionName, $job, $e)
{
    return FailingJob::handle($connectionName, $job, $e);
}
当遇到重试次数大于限制的任务,work进程就会调用FailingJob:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44protected function failJob($connectionName, $job, $e)
{
    return FailingJob::handle($connectionName, $job, $e);
}
public static function handle($connectionName, $job, $e = null)
{
    $job->markAsFailed();
    if ($job->isDeleted()) 
    {
        return;
    }
    try 
    {
        $job->delete();
        $job->failed($e);
    } 
    finally 
    {
        static::events()->fire(
            new JobFailed(
                $connectionName, $job, $e ? : new ManuallyFailedException
            )
        );
    }
}
public function markAsFailed()
{
    $this->failed = true;
}
public function delete()
{
    parent::delete();
    $this->redis->deleteReserved($this->queue, $this);
}
public function isDeleted()
{
    return $this->deleted;
}
FailingJob会标记当前任务failed、deleted,并且会将当前任务移除reserved队列,不会再重试:1
2
3
4
5
6
7public function deleteReserved($queue, $job)
{
    $this->getConnection()->zrem(
        $this->getQueue($queue).':reserved', 
        $job->getReservedJob()
    );
}
FailingJob还会调用RedisJob的failed函数,并且触发JobFailed事件:1
2
3
4
5
6
7
8
9
10
11
12public function failed($e)
{
    $this->markAsFailed();
    $payload = $this->payload();
    list($class, $method) = JobName::parse($payload['job']);
    if (method_exists($this->instance = $this->resolve($class),'failed')) 
    {
        $this->instance->failed($payload['data'], $e);
    }
}
程序会解析job类,我们先前在redis中已经存储了:1
2
3
4
5
6
7
8
9[
    'job'      => 'Illuminate\Queue\CallQueuedHandler@call',
    'maxTries' => isset($job->tries) ? $job->tries : null,
    'timeout'  => isset($job->timeout) ? $job->timeout : null,
    'data'     => [
        'commandName' => get_class($job),
        'command'     => serialize(clone $job),
    ],
];
我们接着看failed函数:1
2
3
4
5
6
7
8public function failed(array $data, $e)
{
    $command = unserialize($data['command']);
    if (method_exists($command, 'failed')) {
        $command->failed($e);
    }
}
可以看到,最后程序调用了任务类的failed函数。
异常处理
当任务遇到异常的时候,程序仍然会判断当前任务的重试次数,如果本次任务的重试次数已经大于或等于限制,那么就会停止重试,标记为失败;否则就会重新放入队列,记录日志。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
    try {
        $this->markJobAsFailedIfWillExceedMaxAttempts(
            $connectionName, $job, (int) $options->maxTries, $e
        );
        $this->raiseExceptionOccurredJobEvent(
            $connectionName, $job, $e
        );
    } 
    finally
    {
        if (! $job->isDeleted()) {
            $job->release($options->delay);
        }
    }
    throw $e;
}
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
    $maxTries = ! is_null($job->maxTries()) 
                ? $job->maxTries() 
                : $maxTries;
    if ($maxTries > 0 && $job->attempts() >= $maxTries) 
    {
        $this->failJob($connectionName, $job, $e);
    }
}
public function release($delay = 0)
{
    parent::release($delay);
    $this->redis->deleteAndRelease($this->queue, $this, $delay);
}
public function deleteAndRelease($queue, $job, $delay)
{
    $queue = $this->getQueue($queue);
    $this->getConnection()->eval(
        LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
        $job->getReservedJob(), $this->availableAt($delay)
    );
}
一旦任务出现异常错误。那么该任务将会立刻从reserved队列放入delayed队列,并且抛出异常,抛出异常后,程序会将其记录在日志中。1
2
3
4
5
6
7
8
9
10public static function release()
{
    return <<<'LUA'
-- Remove the job from the current queue...
redis.call('zrem', KEYS[2], ARGV[1])
-- Add the job onto the "delayed" queue...
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return true
    LUA;
}
任务的运行
任务的运行首先会调用CallQueuedHandler的call函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public function fire()
{
    $payload = $this->payload();
    list($class, $method) = JobName::parse($payload['job']);
    with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
/**
 * Handle the queued job.
 *
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  array  $data
 * @return void
 */
public function call(Job $job, array $data)
{
    try {
        $command = $this->setJobInstanceIfNecessary(
            $job, unserialize($data['command'])
        );
    } catch (ModelNotFoundException $e) {
        return $this->handleModelNotFound($job, $e);
    }
    $this->dispatchThroughMiddleware($job, $command);
    if (! $job->hasFailed() && ! $job->isReleased()) {
        $this->ensureNextJobInChainIsDispatched($command);
    }
    if (! $job->isDeletedOrReleased()) {
        $job->delete();
    }
}
setJobInstanceIfNecessary函数用于为任务类的trait:InteractsWithQueue的设置任务类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/**
 * Set the job instance of the given class if necessary.
 *
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  mixed  $instance
 * @return mixed
 */
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
    if (in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
        $instance->setJob($job);
    }
    return $instance;
}
trait InteractsWithQueue
{
    /**
     * Set the base queue job instance.
     *
     * @param  \Illuminate\Contracts\Queue\Job  $job
     * @return $this
     */
    public function setJob(JobContract $job)
    {
        $this->job = $job;
        return $this;
    }
}
接着任务的运行就要交给dispatch:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47/**
 * Dispatch a command to its appropriate handler in the current process.
 *
 * @param  mixed  $command
 * @param  mixed  $handler
 * @return mixed
 */
public function dispatchNow($command, $handler = null)
{
    if ($handler || $handler = $this->getCommandHandler($command)) {
        $callback = function ($command) use ($handler) {
            return $handler->handle($command);
        };
    } else {
        $callback = function ($command) {
            return $this->container->call([$command, 'handle']);
        };
    }
    return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
/**
 * Retrieve the handler for a command.
 *
 * @param  mixed  $command
 * @return bool|mixed
 */
public function getCommandHandler($command)
{
    if ($this->hasCommandHandler($command)) {
        return $this->container->make($this->handlers[get_class($command)]);
    }
    return false;
}
/**
 * Determine if the given command has a handler.
 *
 * @param  mixed  $command
 * @return bool
 */
public function hasCommandHandler($command)
{
    return array_key_exists(get_class($command), $this->handlers);
}
如果不对dispatcher类进行任何map函数设置,getCommandHandler将会返回null,此时就会调用任务类的handle函数,进行具体的业务逻辑。
任务结束后,就会调用delete函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
 * Delete the job from the queue.
 *
 * @return void
 */
public function delete()
{
    parent::delete();
    $this->redis->deleteReserved($this->queue, $this);
}
/**
 * Delete a reserved job from the queue.
 *
 * @param  string  $queue
 * @param  \Illuminate\Queue\Jobs\RedisJob  $job
 * @return void
 */
public function deleteReserved($queue, $job)
{
    $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}
这样,运行成功的任务会从reserved中删除。