Laravel-Pipeline

思考并回答以下问题:

  • 责任链模式是什么样子的?属于什么类型模式?
  • array_reduce的第一个参数数组是闭包函数集合,第三个参数是闭包。怎么理解?
  • 为什么要用array_reverse反转闭包集合?
  • pipeline管道可以用在游戏开发中吗?

Laravel中管道设计模式的使用——中间件实现原理探究

管道模式可以看作责任链模式的一种具体实现。

所谓管道(Pipeline)设计模式就是将数据传递到一个任务序列中,管道扮演着流水线的角色,数据在这里被处理然后传递到下一个步骤。

使用管道有很多好处,尤其是在单个任务中编写复杂处理代码时管道为我们提供了极大的便利,而且可以在管道中轻松添加、移除或者替换阶段任务。

Laravel在框架中的很多地方用到了Pipeline设计模式,这意味着所有我们需要实现管道设计模式的地方已然是应用底层的一部分了。

我们可以使用Laravel的内部组件在框架顶层构建自己的功能。今天的教程我们将讨论管道设计模式以及如何使用Laravel的内部管道。

什么是管道设计模式

管道模式用于将复杂的进程分解成多个独立的子任务。每个独立的任务都是可复用的,因此这些任务可以被组合成复杂的进程。

这种模式允许你将庞大的进程分解成更小的子任务,这些子任务将数据进行处理并将处理后的结果传递给下一个子任务。就像流水线一样,有条不紊,从原料加工到成品,实现一道完整的工序。

管道中的每一个任务都会接受并返回同一类型的数据,这样子任务可以在管道中被添加、移除或者替换,而不影响其它子任务。

如果你熟悉Unix系统的话,你可能对管道并不陌生,因为在shell命令中我们经常会使用管道命令,例如:

1
cat helloworld.txt | grep "hello world" | rev | > output.txt

在这个例子中,我们读取文件内容,并在其中查询字符串“hello world”,反转字符串,并最终将其添加到output.txt文件中。

如何使用管道模式

Laravel在框架中的很多地方使用了管道设计模式,最常见的就是中间件的实现。

当请求最终到达控制器动作被处理前,会先经过一系列的中间件。每个中间件都有一个独立的职责,例如,设置Cookie、判断是否登录以及阻止CSRF攻击等等。

每个阶段都会对请求进行处理,如果请求通过就会被传递给下一个处理,不通过就会返回相应的HTTP响应。

这种机制使得我们很容易在请求最终到达应用代码前添加处理操作,当然如果不需要这个处理操作你也可以随时移除而不影响请求的生命周期。

管道模式的优点

管道模式有很多优点:

  • 首先,将复杂的处理流程分解成独立的子任务,从而方便测试每个子任务;
  • 其次,被分解的子任务可以被不同的处理进程复用,避免代码冗余;
  • 最后,在复杂进程中添加、移除和替换子任务非常轻松,对已存在的进程没有任何影响。

管道模式的缺点

当然,管道模式也有缺点:

虽然每个子任务变得简单了,但是当你再度尝试将这些子任务组合成完整进程时有一定复杂性;

此外你还需要保证独立子任务测试通过后整体的流程能正常工作,这有一定的不确定性;

最后,当你看到的都是一个个子任务时,对理解整体流程带来困难(盲人摸象的故事想必大家很熟悉,正是此理)。

如何使用Laravel的管道

使用Laravel提供的管道很简单,首先需要创建一个新的Illuminate\Pipeline\Pipeline对象,并将其注入到某个Illuminate\Contracts\Container\Container的实例:

1
$pipeline = app('Illuminate\Pipeline\Pipeline');

接下来将你想要传递的对象发送这个管道:

1
$pipeline->send($request);

然后将其传递到接受并处理请求的任务数组:

1
$pipeline->through($middleware);

最后运行管道任务并编写回调处理:

1
2
3
$pipeline->then(function ($request) {
// Do something
});

这就是中间件的基本工作原理:接收HTTP请求,让请求经过定义好的路由中间件,最后到达目的地进行处理。

结论

管道设计模式很有用,中间件只是一个特别的例子,Laravel在框架底层中充分利用了该设计模式,当然你也可以在自己的项目中使用Laravel提供的管道。

Laravel管道流原理

Laravel管道流原理强烈依赖array_reduce函数,我们先来了解下array_reduce函数的使用。

array_reduce

array_reduce()将回调函数callback迭代地作用到array数组中的每一个单元中,从而将数组简化为单一的值。

1
mixed array_reduce ( array $array , callable $callback [, mixed $initial = NULL ] )

1.array

输入的array。

2.callback

1
mixed callback ( mixed $carry , mixed $item )

$carry包括上次迭代的值,如果本次迭代是第一次,那么这个值是initial,item携带了本次迭代的值

3.initial

如果指定了可选参数initial,该参数将在处理开始前使用,或者当处理结束,数组为空时的最后一个结果。

从文档说明可以看出,array_reduce函数是把数组的每一项,都通过给定的callback函数来简化的。

那我们就来看看是怎么简化的。

1
2
3
4
5
$arr = ['AAAA', 'BBBB', 'CCCC'];

$res = array_reduce($arr, function($carry, $item){
return $carry . $item;
});

给定的数组长度为3,故总迭代三次。

第一次迭代时$carry=null,$item=AAAA返回AAAA
第二次迭代时$carry=AAAA,$item=BBBB返回AAAABBBB
第三次迭代时$carry=AAAABBBB,$item=CCCC返回AAAABBBBCCCC

这种方式将数组简化为一串字符串AAAABBBBCCCC

带初始值的情况

1
2
3
4
5
$arr = ['AAAA', 'BBBB', 'CCCC'];

$res = array_reduce($arr, function($carry, $item){
return $carry . $item;
}, 'INITIAL-');

第一次迭代时($carry = INITIAL-),($item = AAAA)返回INITIAL-AAAA
第二次迭代时($carry = INITIAL-AAAA),($item = BBBB), 返回INITIAL-AAAABBBB
第三次迭代时($carry = INITIAL-AAAABBBB),($item = CCCC),返回INITIAL-AAAABBBBCCCC

这种方式将数组简化为一串字符串INITIAL-AAAABBBBCCCC

闭包

1
2
3
4
5
6
7
8
$arr = ['AAAA', 'BBBB', 'CCCC'];

// 没带初始值
$res = array_reduce($arr, function($carry, $item){
return function() use ($item){ // 这里只use了item
return strtolower($item) . '-';
};
});

第一次迭代时,$carry:null,$item=AAAA,返回一个use了$item=AAAA的闭包
第二次迭代时,$carry:use了$item=AAAA的闭包,$item=BBBB,返回一个use了$item=BBBB的闭包
第三次迭代时,$carry:use了$item=BBBB的闭包,$item=CCCC,返回一个use了$item=CCCC的闭包

这种方式将数组简化为一个闭包,即最后返回的闭包,当我们执行这个闭包时$res()得到返回值CCCC-

上面这种方式只use($item),每次迭代返回的闭包在下次迭代时,我们都没有用起来。只是又重新返回了一个use了当前item值的闭包。

闭包USE闭包

1
2
3
4
5
6
7
8
9
$arr = ['AAAA'];

$res = array_reduce($arr, function($carry, $item){
return function () use ($carry, $item) {
if (is_null($carry)) {
return 'Carry IS NULL' . $item;
}
};
});

注意,此时的数组长度为1,并且没有指定初始值。

由于数组长度为1,故只迭代一次,返回一个use($carry = null, $item = ‘AAAA’)的闭包,当我们执行$res()这个闭包时,得到的结果为Carry IS NULLAAAA。

接下来我们重新改造下:

1
2
3
4
5
6
7
8
9
10
11
12
$arr = ['AAAA', 'BBBB'];

$res = array_reduce($arr, function($carry, $item){
return function () use ($carry, $item) {
if (is_null($carry)) {
return 'Carry IS NULL' . $item;
}
if ($carry instanceof \Closure) {
return $carry() . $item;
}
};
});

我们新增了一个条件判断,若当前迭代的值是一个闭包,返回该闭包的执行结果。

第一次迭代时,$carry的值为null,$item的值为AAAA,返回一个闭包。

1
2
3
4
5
6
7
8
9
// 伪代码
function () use ($carry = null, $item = AAAA) {
if (is_null($carry)) {
return 'Carry IS NULL' . $item;
}
if ($carry instanceof \Closure) {
return $carry() . $item;
}
}

假设我们直接执行该闭包,将会返回Carry IS NULLAAAA的结果。

第二次迭代时,$carry的值为上述返回的闭包(伪代码),$item的值为BBBB,返回一个闭包,当我们执行这个闭包时,满足$carry instanceof \Closure,得到结果Carry IS NULLAAAABBBB。

Laravel中的array_reverse

大致了解了array_reverse函数的使用后,我们来瞅瞅laravel管道流里使用array_reverse的情况。

php内置方法array_reduce把所有要通过的中间件都通过callback方法并压缩为一个Closure。最后再执行Initial。

Laravel中通过全局中间件的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Illuminate\Foundation\Http\Kernel.php
protected function sendRequestThroughRouter($request)
{
return (new Pipeline($this->app))
->send($request)
->through($this->app->shouldSkipMiddleware() ? [] : $this->middleware)
->then($this->dispatchToRouter());
}

protected function dispatchToRouter()
{
return function ($request) {
$this->app->instance('request', $request);
return $this->router->dispatch($request);
};
}

正如我前面说的,我们发送一个$request对象通过middleware中间件数组,最后再执行dispatchToRouter方法。

假设有两个全局中间件,我们来看看这两个中间件是如何通过管道压缩为一个Closure的。

1
2
Illuminate\Foundation\Http\Middleware\CheckForMaintenanceMode::class,
App\Http\Middleware\AllowOrigin::class, // 自定义中间件

Illuminate\Pipeline\Pipeline为laravel的管道流核心类。

在Illuminate\Pipeline\Pipeline的then方法中,$destination为上述的dispatchToRouter闭包,pipes为要通过的中间件数组,passable为Request对象。

1
2
3
4
5
6
7
public function then(Closure $destination)
{
$pipeline = array_reduce(
array_reverse($this->pipes), $this->carry(), $this->prepareDestination($destination)
);
return $pipeline($this->passable);
}

array_reverse函数将中间件数组的每一项都通过$this->carry(),初始值为上述dispatchToRouter 方法返回的闭包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected function prepareDestination(Closure $destination)
{
return function ($passable) use ($destination) {
return $destination($passable);
};
}

protected function carry()
{
return function ($stack, $pipe) {
return function ($passable) use ($stack, $pipe) {
if ($pipe instanceof Closure) {
return $pipe($passable, $stack);
} elseif (! is_object($pipe)) {
//解析中间件参数
list($name, $parameters) = $this->parsePipeString($pipe);
$pipe = $this->getContainer()->make($name);
$parameters = array_merge([$passable, $stack], $parameters);
} else {
$parameters = [$passable, $stack];
}
return $pipe->{$this->method}(...$parameters);
};
};
}

第一次迭代时,返回一个闭包,use了$stack和$pipe,$stack的值为初始值闭包,$pipe为中间件类名,此处是App\Http\Middleware\AllowOrigin::class(注意array_reverse函数把传进来的中间件数组倒叙了)。

假设我们直接运行该闭包,由于此时$pipe是一个String类型的中间件类名,只满足!is_object($pipe)这个条件,我们将直接从容器中make一个该中间件的实例出来,在执行该中间件实例的handle方法(默认$this->method为handle)。并且将request对象和初始值作为参数,传给这个中间件。

1
2
3
4
public function handle($request, Closure $next)
{
//......
}

在这个中间件的handle方法中,当我们直接执行return $next($request)时,相当于我们开始执行array_reduce函数的初始值闭包了,即上述的dispatchToRouter方法返回的闭包。

1
2
3
4
5
6
7
protected function dispatchToRouter()
{
return function ($request) {
$this->app->instance('request', $request);
return $this->router->dispatch($request);
};
}

好,假设结束。在第二次迭代时,也返回一个use了$stack和$pipe的闭包,$stack的值为我们第一次迭代时返回的闭包,$pipe 为中间件类名,此处是Illuminate\Foundation\Http\Middleware\CheckForMaintenanceMode::class。

两次迭代结束,回到then方法中,我们手动执行了第二次迭代返回的闭包。

1
return $pipeline($this->passable);

当执行第二次迭代返回的闭包时,当前闭包use的$pipe为Illuminate\Foundation\Http\Middleware\CheckForMaintenanceMode::class,同样只满足!is_object($pipe)这个条件,我们将会从容器中make出CheckForMaintenanceMode中间件的实例,在执行该实例的handle方法,并且把第一次迭代返回的闭包作为参数传到handle方法中。

当我们在CheckForMaintenanceMode中间件的handle方法中执行return $next($request)时,此时的$next为我们第一次迭代返回的闭包,将回到我们刚才假设的流程那样。从容器中make 一个App\Http\Middleware\AllowOrigin实例,在执行该实例的handle方法,并把初始值闭包作为参数传到AllowOrigin中间件的handle方法中。当我们再在AllowOrigin中间件中执行return $next($request)时,代表我们所有中间件都通过完成了,接下来开始执行dispatchToRouter。

  • 1.中间件是区分先后顺序的,从这里你应该能明白为什么要把中间件用array_reverse倒叙了。
  • 2.并不是所有中间件在运行前都已经实例化了的,用到的时候才去向容器取。
  • 3.中间件不执行$next($request)后续所有中间件无法执行。

Pipeline

Illuminate\Contracts\Pipeline\Hub.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php

namespace Illuminate\Contracts\Pipeline;

interface Hub
{
/**
* Send an object through one of the available pipelines.
*
* @param mixed $object
* @param string|null $pipeline
* @return mixed
*/
public function pipe($object, $pipeline = null);
}

Illuminate\Contracts\Pipeline\Pipeline.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php

namespace Illuminate\Contracts\Pipeline;

use Closure;

interface Pipeline
{
/**
* Set the traveler object being sent on the pipeline.
*
* @param mixed $traveler
* @return $this
*/
public function send($traveler);

/**
* Set the stops of the pipeline.
*
* @param dynamic|array $stops
* @return $this
*/
public function through($stops);

/**
* Set the method to call on the stops.
*
* @param string $method
* @return $this
*/
public function via($method);

/**
* Run the pipeline with a final destination callback.
*
* @param \Closure $destination
* @return mixed
*/
public function then(Closure $destination);
}

Illuminate\Pipeline\PipelineServiceProvider.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?php

namespace Illuminate\Pipeline;

use Illuminate\Contracts\Pipeline\Hub as PipelineHubContract;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Support\ServiceProvider;

class PipelineServiceProvider extends ServiceProvider implements DeferrableProvider
{
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->app->singleton(
PipelineHubContract::class, Hub::class
);
}

/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
PipelineHubContract::class,
];
}
}

Illuminate\Pipeline\Pipeline.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
namespace Illuminate\Pipeline;

use Closure;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
use RuntimeException;
use Throwable;

class Pipeline implements PipelineContract
{
/**
* The container implementation.
*
* @var \Illuminate\Contracts\Container\Container
*/
protected $container;

/**
* The object being passed through the pipeline.
*
* @var mixed
*/
protected $passable;

/**
* The array of class pipes.
*
* @var array
*/
protected $pipes = [];

/**
* The method to call on each pipe.
*
* @var string
*/
protected $method = 'handle';

/**
* Create a new class instance.
*
* @param \Illuminate\Contracts\Container\Container|null $container
* @return void
*/
public function __construct(Container $container = null)
{
$this->container = $container;
}

/**
* Set the object being sent through the pipeline.
*
* @param mixed $passable
* @return $this
*/
public function send($passable)
{
$this->passable = $passable;

return $this;
}

/**
* Set the array of pipes.
*
* @param array|mixed $pipes
* @return $this
*/
public function through($pipes)
{
$this->pipes = is_array($pipes) ? $pipes : func_get_args();

return $this;
}

/**
* Set the method to call on the pipes.
*
* @param string $method
* @return $this
*/
public function via($method)
{
$this->method = $method;

return $this;
}

/**
* Run the pipeline with a final destination callback.
*
* @param \Closure $destination
* @return mixed
*/
public function then(Closure $destination)
{
$pipeline = array_reduce(
array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
);

return $pipeline($this->passable);
}

/**
* Run the pipeline and return the result.
*
* @return mixed
*/
public function thenReturn()
{
return $this->then(function ($passable){
return $passable;
});
}

/**
* Get the final piece of the Closure onion.
*
* @param \Closure $destination
* @return \Closure
*/
protected function prepareDestination(Closure $destination)
{
return function ($passable) use ($destination) {
try {
return $destination($passable);
} catch (Throwable $e) {
return $this->handleException($passable, $e);
}
}
}

/**
* Get a Closure that represents a slice of the application onion.
*
* @return \Closure
*/
protected function carry()
{
return function($stack, $pipe) {
return function ($passable) use ($stack, $pipe) {
try {
if (is_callable($pipe)) {
// If the pipe is a callable, then we will call it directly, but otherwise we
// will resolve the pipes out of the dependency container and call it with
// the appropriate method and arguments, returning the results back out.
return $pipe($passable, $stack);
} elseif (! is_object($pipe)) {
[$name, $parameters] = $this->parsePipeString($pipe);

// If the pipe is a string we will parse the string and resolve the class out
// of the dependency injection container. We can then build a callable and
// execute the pipe function giving in the parameters that are required.
$pipe = $this->getContainer()->make($name);

$parameters = array_merge([$passable, $stack], $parameters);
} else
{
// If the pipe is already an object we'll just make a callable and pass it to
// the pipe as-is. There is no need to do any extra parsing and formatting
// since the object we're given was already a fully instantiated object.
$parameters = [$passable, $stack];
}

$carry = method_exists($pipe, $this->method)
? $pipe->{$this->method}(...$parameters)
: $pipe(...$parameters);

return $this->handleCarry($carry);
} catch (Throwable $e) {
return $this->handleException($passable, $e);
}
}
}
}

/**
* Parse full pipe string to get name and parameters.
*
* @param string $pipe
* @return array
*/
protected function parsePipeString($pipe)
{
[$name, $parameters] = array_pad(explode(':', $pipe, 2), 2, []);

if (is_string($parameters)) {
$parameters = explode(',', $parameters);
}

return [$name, $parameters];
}

/**
* Get the array of configured pipes.
*
* @return array
*/
protected function pipes()
{
return $this->pipes;
}

/**
* Get the container instance.
*
* @return \Illuminate\Contracts\Container\Container
*
* @throws \RuntimeException
*/
protected function getContainer()
{
if (! $this->container) {
throw new RuntimeException('A container instance has not been passed to the Pipeline.');
}

return $this->container;
}

/**
* Handle the value returned from each pipe before passing it to the next.
*
* @param mixed $carry
* @return mixed
*/
protected function handleCarry($carry)
{
return $carry;
}

/**
* Handle the given exception.
*
* @param mixed $passable
* @param \Throwable $e
* @return mixed
*
* @throws \Throwable
*/
protected function handleException($passable, Throwable $e)
{
throw $e;
}
}
  • 1.管道处理的核心方法then,实际上是一个非常有用的PHP函数array_reduce($array, $callback, $initial)的应用,只不过待处理数组变成了可调用的中间件函数(字符串或closure表示的);
  • 2.处理函数因为有额外参数$passable(即Request实例)需要传入,所以多包了一层闭包函数【注:关于多层包含的闭包函数,在Python装饰器原理中可以充分理解,尤其是阮一峰的逐步递进写完善的装饰器过程】;
  • 3.初始值也是一个闭包函数,这个是真正的response响应函数。

$this->method = ‘handle’ 这个方法名,可通过via函数修改。

中间件类方法handle($request, Closure $next)有2个参数,这个对应闭包处理过后的中间件函数的返回值

1
return $pipe($passable, $stack)。

Laravel中间件逻辑,是闭包函数强大功能的一大体现。于是我们使用闭包函数,以函数式编程风格实现简化版的中间件流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
function f1($f){
return function($x) use ($f){
echo 'middleware 1 begin.'.PHP_EOL;
$x += 1;
$x = $f($x);
echo 'middleware 1 end.'.PHP_EOL;
return $x;
};
}

function f2($f){
return function($x) use($f){
echo 'middleware 2 begin: '.PHP_EOL;
$x += 2;
$x = $f($x);
echo 'middleware 2 end.'.PHP_EOL;
return $x;
};

}

function respond(){
return function($x){
echo 'Generate some response.'.PHP_EOL;
return $x;
};
}

$x = 1;
$response = f2(f1(respond()))($x);
echo $response;

输出:

1
2
3
4
5
6
middleware 2 begin:
middleware 1 begin.
Generate some response.
middleware 1 end.
middleware 2 end.
4

可以看出为什么Laravel的Pipeline的实现需要反转中间件数组。

高阶函数的实现,可以通过层层传递“函数名字符串”的原始方法来实现,但是通过闭包函数实现更加简单。

array_reduce写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$f1 = function($x, $f){
echo 'middleware 1 begin.'.PHP_EOL;
$x += 1;
$x = $f($x);
echo 'middleware 1 end.'.PHP_EOL;
return $x;
};

$f2 = function($x, $f){
echo 'middleware 2 begin: '.PHP_EOL;
$x += 2;
$x = $f($x);
echo 'middleware 2 end.'.PHP_EOL;
return $x;
};

$respond = function($x){
echo 'Generate some response.'.PHP_EOL;
return $x;
};

$middlewares = [$f1, $f2];
$initial = $respond;
$foo = array_reduce($middlewares, function($stack, $item){
return function($request) use ($stack, $item){
return $item($request, $stack);
};
}, $initial);

$x = 1;
echo $foo($x);

输出:

1
2
3
4
5
middleware 2 begin:
middleware 1 begin.
Generate some response.
middleware 1 end.
middleware 2 end.
0%