思考并回答以下问题:
运行队列处理器
队列处理器的设置
Laravel包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过queue:work命令来运行处理器。要注意,一旦queue:work命令开始,它将一直运行,直到你手动停止或者你关闭控制台:1
php artisan queue:work
- 可以指定队列处理器所使用的连接。
1 | php artisan queue:work redis |
- 可以自定义队列处理器,方式是处理给定连接的特定队列。
1 | php artisan queue:work redis --queue=emails |
- 可以使用--once选项来指定仅对队列中的单一任务进行处理:
1 | php artisan queue:work --once |
- 如果一个任务失败了,会被放入延时队列中去,--delay选项可以设置失败任务的延时时间:
1 | php artisan queue:work --delay=2 |
- 如果想要限制一个任务的内存,可以使用--memory :
1 | php artisan queue:work --memory=128 |
- 当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。但是,如果没有新的工作可用,--sleep参数决定了工作进程将「睡眠」多长时间:
1 | php artisan queue:work --sleep=3 |
- 可以指定Laravel队列处理器最多执行多长时间后就应该被关闭掉:
1 | php artisan queue:work --timeout=60 |
- 可以指定Laravel队列处理器失败任务重试的次数:
1 | php artisan queue:work --tries=60 |
可以看出来,队列处理器的设置大多数都可以由任务类进行设置,但是其中三个sleep、delay、memory只能由artisan来设置。
WorkCommand命令行启动
任务处理器进程的命令行模式会调用Illuminate\Queue\Console\WorkCommand,这个类在初始化的时候依赖注入了Illuminate\Queue\Worker:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64class WorkCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:work
{connection? : The name of the queue connection to work}
{--queue= : The names of the queues to work}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--stop-when-empty : Stop when the queue is empty}
{--delay=0 : The number of seconds to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : Number of times to attempt a job before logging it failed}';
/**
* Create a new queue work command.
*
* @param \Illuminate\Queue\Worker $worker
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
*/
public function __construct(Worker $worker, Cache $cache)
{
parent::__construct();
$this->cache = $cache;
$this->worker = $worker;
}
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
if ($this->downForMaintenance() && $this->option('once'))
{
return $this->worker->sleep($this->option('sleep'));
}
// 我们将侦听已处理的事件和失败的事件,以便在处理作业时将信息写入控制台,
// 这将使开发人员可以观察到队列中正在处理哪些作业,并了解其进度。
$this->listenForEvents();
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
// 我们需要为应用程序的队列配置文件中设置的连接获取正确的队列。
// 我们将基于为当前正在执行的队列操作运行的设置连接来拉它。
$queue = $this->getQueue($connection);
$this->runWorker(
$connection, $queue
);
}
}
任务处理器启动后,会运行handle函数,在执行任务之前,程序首先会注册监听事件,主要监听任务完成与任务失败的情况:
1 | /** |
启动任务管理器runWorker,该函数默认会调用Illuminate\Queue\Worker的daemon函数,只有在命令中强制--once参数的时候,才会执行runNestJob函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->cache);
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
Worker任务调度
我们接下来接着看daemon函数:
Illuminate\Queue\Worker.php
1 | /** |
信号处理
listenForSignals函数用于PHP 7.1版本以上,用于脚本的信号处理。所谓的信号处理,就是由Process Monitor(如Supervisor)发送并与我们的脚本进行通信的异步通知。
1 | protected function listenForSignals() |
pcntl_async_signals()被调用来启用信号处理,然后我们为多个信号注册处理程序:
- 当脚本被Supervisor指示关闭时,会引发信号SIGTERM。
- SIGUSR2是用户定义的信号,Laravel用来表示脚本应该暂停。
- 当暂停的脚本被Supervisor指示继续进行时,会引发SIGCONT。
在真正运行任务之前,程序还从cache中取了一次最后一次重启的时间:1
2
3
4
5
6
7protected function getTimestampOfLastQueueRestart()
{
if ($this->cache)
{
return $this->cache->get('illuminate:queue:restart');
}
}
确定worker是否应该处理作业进入循环后,首先要判断当前脚本是应该处理任务,还是应该暂停,还是应该退出:
1 | protected function daemonShouldRun(WorkerOptions $options) |
以下几种情况,循环将不会处理任务:
- 脚本处于维护模式并且没有--force选项。
- 脚本被supervisor暂停。
- 脚本的looping事件监听器返回false。
looping事件监听器在每次循环的时候都会被启动,如果返回false,那么当前的循环将会被暂停:pauseWorker:1
2
3
4
5protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
}
脚本在sleep一段时间之后,就要重新判断当前脚本是否需要stop:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit)
{
$this->kill();
}
if ($this->memoryExceeded($options->memory))
{
$this->stop(12);
}
elseif ($this->queueShouldRestart($lastRestart))
{
$this->stop();
}
}
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache)
{
return $this->cache->get('illuminate:queue:restart');
}
}
以下情况脚本将会被stop:
- 脚本被supervisor退出。
- 内存超限。
- 脚本被重启过。
1 | public function kill($status = 0) |
脚本被重启,当前的进程需要退出并且重新加载。
获取下一个任务
当含有多个队列的时候,命令行可以用“,”连接多个队列的名字,位于前面的队列优先级更高:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20protected function getNextJob($connection, $queue)
{
try {
foreach (explode(',', $queue) as $queue)
{
if (! is_null($job = $connection->pop($queue)))
{
return $job;
}
}
}
catch (Exception $e)
{
$this->exceptions->report($e);
}
catch (Throwable $e)
{
$this->exceptions->report(new FatalThrowableError($e));
}
}
$connection是具体的驱动,我们这里是Illuminate\Queue\RedisQueue:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37class RedisQueue extends Queue implements QueueContract
{
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
}
在从队列中取出任务之前,需要先将delay队列和reserved队列中已经到时间的任务放到主队列中:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29/**
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
if (! is_null($this->retryAfter))
{
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
);
}
由于从队列取出任务、在队列删除任务、压入主队列是三个操作,为了防止并发,程序这里使用了LUA脚本,保证三个操作的原子性:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* KEYS[3] - The notification list for the queue we are moving jobs to, for example queues:foo:notify
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i+99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end
return val
LUA;
}
接下来,就要从主队列中获取下一个任务,在取出下一个任务之后,还要将任务放入reserved队列中,当任务执行失败后,该任务会进行重试。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57/**
* Retrieve the next job from the queue.
*
* @param string $queue
* @param bool $block
* @return array
*/
protected function retrieveNextJob($queue, $block = true)
{
$nextJob = $this->getConnection()->eval(
LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
$this->availableAt($this->retryAfter)
);
if (empty($nextJob)) {
return [null, null];
}
[$job, $reserved] = $nextJob;
if (! $job && ! is_null($this->blockFor) && $block &&
$this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
return $this->retrieveNextJob($queue, false);
}
return [$job, $reserved];
}
/**
* Get the Lua script for popping the next job off of the queue.
*
* KEYS[1] - The queue to pop jobs from, for example: queues:foo
* KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
* KEYS[3] - The notify queue
* ARGV[1] - The time at which the reserved job will expire
*
* @return string
*/
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
redis.call('lpop', KEYS[3])
end
return {job, reserved}
LUA;
}
从redis中获取到job之后,就会将其包装成RedisJob类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
{
$this->job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
$this->connectionName = $connectionName;
$this->decoded = $this->payload();
}
public function payload()
{
return json_decode($this->getRawBody(), true);
}
public function getRawBody()
{
return $this->job;
}
超时处理
如果一个脚本超时,pcntl_alarm将会启动并杀死当前的work进程。杀死进程后,work进程将会被守护进程重启,继续进行下一个任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($options->timeout > 0 && $this->supportsAsyncSignals())
{
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
}
}
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout())
? $job->timeout()
: $options->timeout;
}
任务事务
运行任务前后会启动两个事件JobProcessing与JobProcessed,这两个事件需要事先注册监听者:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45protected function runJob($job, $connectionName, WorkerOptions $options)
{
try
{
return $this->process($connectionName, $job, $options);
}
catch (Exception $e)
{
$this->exceptions->report($e);
}
catch (Throwable $e)
{
$this->exceptions->report(new FatalThrowableError($e));
}
}
public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName,
$job,
(int) $options->maxTries
);
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
}
catch (Exception $e)
{
$this->handleJobException($connectionName, $job, $options, $e);
}
catch (Throwable $e)
{
$this->handleJobException(
$connectionName,
$job,
$options,
new FatalThrowableError($e)
);
}
}
任务前与任务后事件
raiseBeforeJobEvent函数用于触发任务处理前的事件,raiseAfterJobEvent函数用于触发任务处理后的事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17protected function raiseBeforeJobEvent($connectionName, $job)
{
$this->events->fire(
new Events\JobProcessing(
$connectionName, $job
)
);
}
protected function raiseAfterJobEvent($connectionName, $job)
{
$this->events->fire(
new Events\JobProcessed(
$connectionName, $job
)
);
}
任务异常处理
任务在运行过程中会遇到异常情况,这个时候就要判断当前任务的失败次数是不是超过限制。如果没有超过限制,那么就会把当前任务重新放回队列当中;如果超过了限制,那么就要标记当前任务为失败任务,并且将任务从reserved队列中删除。
任务失败
markJobAsFailedIfAlreadyExceedsMaxAttempts函数用于任务运行前,判断当前任务是否重试次数超过限制:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries())
? $job->maxTries()
: $maxTries;
if ($maxTries === 0 || $job->attempts() <= $maxTries)
{
return;
}
$this->failJob(
$connectionName,
$job,
$e = new MaxAttemptsExceededException(
'A queued job has been attempted too many times. The job may have previously timed out.'
)
);
throw $e;
}
public function maxTries()
{
return array_get($this->payload(), 'maxTries');
}
public function attempts()
{
return Arr::get($this->decoded, 'attempts') + 1;
}
protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}
当遇到重试次数大于限制的任务,work进程就会调用FailingJob:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}
public static function handle($connectionName, $job, $e = null)
{
$job->markAsFailed();
if ($job->isDeleted())
{
return;
}
try
{
$job->delete();
$job->failed($e);
}
finally
{
static::events()->fire(
new JobFailed(
$connectionName, $job, $e ? : new ManuallyFailedException
)
);
}
}
public function markAsFailed()
{
$this->failed = true;
}
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}
public function isDeleted()
{
return $this->deleted;
}
FailingJob会标记当前任务failed、deleted,并且会将当前任务移除reserved队列,不会再重试:1
2
3
4
5
6
7public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem(
$this->getQueue($queue).':reserved',
$job->getReservedJob()
);
}
FailingJob还会调用RedisJob的failed函数,并且触发JobFailed事件:1
2
3
4
5
6
7
8
9
10
11
12public function failed($e)
{
$this->markAsFailed();
$payload = $this->payload();
list($class, $method) = JobName::parse($payload['job']);
if (method_exists($this->instance = $this->resolve($class),'failed'))
{
$this->instance->failed($payload['data'], $e);
}
}
程序会解析job类,我们先前在redis中已经存储了:1
2
3
4
5
6
7
8
9[
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
我们接着看failed函数:1
2
3
4
5
6
7
8public function failed(array $data, $e)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed($e);
}
}
可以看到,最后程序调用了任务类的failed函数。
异常处理
当任务遇到异常的时候,程序仍然会判断当前任务的重试次数,如果本次任务的重试次数已经大于或等于限制,那么就会停止重试,标记为失败;否则就会重新放入队列,记录日志。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
try {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
}
finally
{
if (! $job->isDeleted()) {
$job->release($options->delay);
}
}
throw $e;
}
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries())
? $job->maxTries()
: $maxTries;
if ($maxTries > 0 && $job->attempts() >= $maxTries)
{
$this->failJob($connectionName, $job, $e);
}
}
public function release($delay = 0)
{
parent::release($delay);
$this->redis->deleteAndRelease($this->queue, $this, $delay);
}
public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);
$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job->getReservedJob(), $this->availableAt($delay)
);
}
一旦任务出现异常错误。那么该任务将会立刻从reserved队列放入delayed队列,并且抛出异常,抛出异常后,程序会将其记录在日志中。1
2
3
4
5
6
7
8
9
10public static function release()
{
return <<<'LUA'
-- Remove the job from the current queue...
redis.call('zrem', KEYS[2], ARGV[1])
-- Add the job onto the "delayed" queue...
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return true
LUA;
}
任务的运行
任务的运行首先会调用CallQueuedHandler的call函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public function fire()
{
$payload = $this->payload();
list($class, $method) = JobName::parse($payload['job']);
with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
/**
* Handle the queued job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function call(Job $job, array $data)
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
$this->dispatchThroughMiddleware($job, $command);
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
}
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
setJobInstanceIfNecessary函数用于为任务类的trait:InteractsWithQueue的设置任务类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/**
* Set the job instance of the given class if necessary.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $instance
* @return mixed
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
$instance->setJob($job);
}
return $instance;
}
trait InteractsWithQueue
{
/**
* Set the base queue job instance.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return $this
*/
public function setJob(JobContract $job)
{
$this->job = $job;
return $this;
}
}
接着任务的运行就要交给dispatch:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47/**
* Dispatch a command to its appropriate handler in the current process.
*
* @param mixed $command
* @param mixed $handler
* @return mixed
*/
public function dispatchNow($command, $handler = null)
{
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
return $handler->handle($command);
};
} else {
$callback = function ($command) {
return $this->container->call([$command, 'handle']);
};
}
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
/**
* Retrieve the handler for a command.
*
* @param mixed $command
* @return bool|mixed
*/
public function getCommandHandler($command)
{
if ($this->hasCommandHandler($command)) {
return $this->container->make($this->handlers[get_class($command)]);
}
return false;
}
/**
* Determine if the given command has a handler.
*
* @param mixed $command
* @return bool
*/
public function hasCommandHandler($command)
{
return array_key_exists(get_class($command), $this->handlers);
}
如果不对dispatcher类进行任何map函数设置,getCommandHandler将会返回null,此时就会调用任务类的handle函数,进行具体的业务逻辑。
任务结束后,就会调用delete函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this);
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}
这样,运行成功的任务会从reserved中删除。