Laravel Pipeline组件的实现

思考并回答以下问题:

  • (new Pipeline)是什么意思?

Laravel框架中有一个非常有趣的功能,就是HTTP中间件,我们在定义路由的时候,通过中间件对访问进行过滤。来自外部的请求首先经过全局中间件,若通过,则会继续穿过层层路由组所设置的中间件,再到达目的路由,当然,目的路由也可能定义了个中间件,通过后,该路由的处理对象(如控制器),得到的就是一个经过过滤的请求了。

开始

本文当然不是讨论中间件如何使用,而是其实现的基础。Laravel框架中有一个组件叫做Illuminate\Pipeline,意味“管道”,我们看看下面这个代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php
use Illuminate\Pipeline\Pipeline;
use Closure;

$pipe1 = function ($poster, Closure $next) {
$poster += 1;
echo "pipe1: $poster\n";
return $next($poster);
};

$pipe2 = function ($poster, Closure $next) {
if ($poster > 7) {
return $poster;
}

$poster += 3;
echo "pipe2: $poster\n";
return $next($poster);
};

$pipe3 = function ($poster, Closure $next) {
$result = $next($poster);
echo "pipe3: $result\n";
return $result * 2;
};

$pipe4 = function ($poster, Closure $next) {
$poster += 2;
echo "pipe4 : $poster\n";
return $next($poster);
};

$pipes = [$pipe1, $pipe2, $pipe3, $pipe4];

function dispatcher($poster, $pipes)
{
echo "result: " . (new Pipeline)->send($poster)->through($pipes)->then(function ($poster) {
echo "received: $poster\n";
return 3;
}) . "\n";
}

echo "==> action 1:\n";
dispatcher(5, $pipes);
echo "==> action 2:\n";
dispatcher(7, $pipes);

上述代码执行结果如下:

1
2
3
4
5
6
7
8
9
10
==> action 1:
pipe1: 6
pipe2: 9
pipe4 : 11
received: 11
pipe3: 3
result: 6
==> action 2:
pipe1: 8
result: 8

流程概览

Pipeline 组件实现了一个过滤流程:

原始数据 ---> 【前置管道】 ---> 目标处理逻辑 ---> 【后置管道】 ---> 结果数据


通过这种机制,可以将目标处理逻辑与过滤、认证等机制的代码分离开来,这样我们就更容易让代码清晰和易于维护。通过前置、后置管道,在其中“放置”我们需要过滤的逻辑即可,如上述代码,虽然只是一个简单的示例,就已经能够看得出,整个流程的动向,譬如我们在上面示例中准备了四个过滤组件(中间件):pipe1、pipe2、pipe3、pipe4,其中1、2、4是前置,3为后置。

输入的原始数据为5,执行过程首先通过1号过滤组件,然后是2号,再然后是4号,到达目标处理逻辑后,再通过3号过滤组件,最终输出结果。

输入原始数据为7,同样是先经过1号过滤组件,随后是2号,不过在2号中,直接返回了结果,这意味着过程被拦截,不再继续向下传递数据,至此结束并返回结果。

Laravel框架中,原始数据是一个Request对象,通过所定义的前置中间件,开发者可在中间件中获取Request的信息,比如用户的Session/Cookie以及Header等,验证数据是否完备等等,不完备或不符合要求的,则被拦截并返回一个响应告知。若能正常通过则继续传递至最终的处理逻辑,如控制器的某个方法或者一个匿名函数。通过这种模式,我们就实现了请求校验和业务逻辑的分离,而且这样十分便于开发和维护。

实现

Pipeline这个组件的功能十分明确,实现这种类似功能的肯定不少,选择其作为代表分析,原因就是其实现的方式非常简洁、有力,不但其实现原理如此,面对开发人员,它的调用方式也十分清晰,利用匿名函数使得前置与后置的调用都很直观,本文分析的重点就在这里。

实现的思路即使有了,在没有很好地基础之前,估计也很难去完成。当然很多人愿意去阅读其代码,这样就少走了不少弯路,在这里,我的建议也是这样。不过,很多人看到源码也很迷惑,因为中间存在着非常多的回调,只要基础不够扎实,就很容易在期间产生诸多困惑。

不过,逐步分析和对基础知识的补完,就会发现再复杂的框架也不过是零碎的功能有序的构建起来的。

array_reduce的妙用

1
2
3
4
5
6
7
8
9
10
public function then(Closure $destination)
{
$firstSlice = $this->getInitialSlice($destination);

$callable = array_reduce(
array_reverse($this->pipes), $this->getSlice(), $firstSlice
);

return $callable($this->passable);
}

上面的代码就是Pipeline启动过程的起点,当然在调用then方法之前我们还有必要调用send和through,send是传递初始数据,through则是传递需要通过的中间件构成的数组。

then方法接受一个要求匿名函数的参数,该参数所接受的匿名函数,就是用于整个流程的逻辑处理部分的,数据穿过层层中间件,最终到达这里,所以该匿名函数可接受一个参数,就是经过过滤的数据啦。该方法囊括着所有功能,但是代码不过几行,因此肯定有额外的调度过程。

代码中首先映入眼帘的就是$this->getInitialSlice(),该方法顾名思义,创建了一个初始化用的Slice,这块我们先不细说,因为随后就是本文的重点,亦是组件实现的核心功能:array_reduce 函数!。

其作用本质就是通过用户自定义的方式去将一个数组合并成单一的一个值,因此该函数要求三个参数:待合并的数组、用于合并逻辑的回调函数、初始合并的值(亦或者特殊情境下的最终值),用于合并逻辑的回调须接受两个参数值,分别是上一次处理逻辑处理的结果(第一次不存在处理结果,则默认为空,若设置了array_reduce的第三个参数,则以该参数为初始值)和待处理的数组项。

Pipeline组件恰到好处的使用了它。我们看得到,Pipeline首先将我们用于处理的中间件数组通过array_reverse取相反顺序(至于为什么这么做后面你们就知道了),传递至array_reduce 的第一个参数。第三个参数作为array_reduce认定的默认处理对象,Pipeline用的是先前通过getInitalSlice获取到的(实际上是用户传进来的目标逻辑处理函数)作为值传递。

然后就是本文第二个介绍的重点,array_reduce所接受的第二个参数,通过调用$this->getSlice()获取的一个匿名函数!

实现的核心

array_reduce的第二个参数要求传递一个回调函数用于处理数组合并,$this->getSlice()返回的正是这个处理函数,我相信你们一定看到了getSlice返回的值,那么我就将这个匿名函数单独拿出来:

1
2
3
4
5
6
7
8
9
10
11
function ($stack, $pipe) {
return function ($passable) use ($stack, $pipe) {
if ($pipe instanceof Closure) {
return call_user_func($pipe, $passable, $stack);
}
// 省略了一部分,该部分是针对中间件 “类”而不是中间件匿名函数的,
// 先前例子中我们用的都是以匿名函数作为数组传递进来的,因此只会进入上面那个条件,
// 当然Laravel框架中,传递进来的则基本是中间件对象的类名,这段省略的代码,
// 和上面那个if中的本质的区别就是,省略的代码中包含了中间件类的实例化过程并调用的是其handle方法而不是直接调用函数,仅此~~
};
};

我知道大家看到的代码有很多行,但是实际上就只有一行return function() { … };,被执行的也只有它。对于一些初学者,很容易产生一种错觉:那个返回的function会在return前执行。既然是错觉,那就意味着不会被执行,而是作为一个值被返回,可能会被后续某个地方所调用!可能会被后续某个地方所调用!可能会被后续某个地方所调用!这里只是个值!重要的事情说三遍。

虽说会被后面所调用,但我们依旧要在这里提一下这个被返回的匿名函数,在这里,它又有着另一个名称:闭包。闭包是由匿名函数(也成闭包函数)构成的一个整体,和普通的匿名函数有所不同,闭包中一定存在引用了外部数据并在内部操作的情况。

这里需要注意,返回的不仅仅是个匿名函数,更是一个闭包,该闭包中引用了两个外部值,分别是array_reduce提供给第二参数中的回调的两个参数,即数组合并结果和当前待合并的值。

第一次执行时,$stack就是我们的目标处理逻辑代码段,$pipe则是第一个中间件;

第二次执行时,$stack是第一次执行所返回的闭包,$pipe则是第二个中间件,随后以此类推。

最后一次执行,返回的结果仍旧是一个闭包,该闭包中所引用的外部数据是倒数第二次的执行返回的闭包,$pipe是最后一个中间件。随后,该闭包在then方法中被调用,传递进了我们通过send方法传递的值。

上面的描述可能异常抽象,我们让其变得稍微直观一些,我会将所有遍历每一次执行带来的变化体现出来。不过为了方便理解,我需要改一下示例代码,去掉中间的条件判断,因为我们现在重点是理解这个流程而不是其功能,新的代码与执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
use Illuminate\Pipeline\Pipeline;

$pipes = [
function ($poster, $callback) {
$poster += 1;
return $callback($poster);
},
function ($poster, $callback) {
$result = $callback($poster);

return $result - 1;
},
function ($poster, $callback) {
$poster += 2;

return $callback($poster);
}
];

echo (new Pipeline)->send(0)->through($pipes)->then(function ($poster) {
return $poster;
}); // 执行输出为 2

上述代码,我们定义了三个中间件,同时我们的目标逻辑代码并没做什么特殊的事情,这样我们就可以专注在执行流程上。下面便于分析,我做了一份伪代码以及等式方便理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
poster     = 0
f^0 = f(z)->{ z } // 定义目标处理逻辑
f^1 = f(z, y)->{ f^y( z + 1 ) } // 定义中间件 1
f^2 = f(z, y)->{ result = f^y(z); result - 1 } // 定义中间件 2
f^3 = f(z, y)->{ f^y( z + 2 ) } // 定义中间件 3
f^getSlice = f(y, x)->{
f(z)->{
call( f^x(z, y) )
}
}

callback = array_reduce([f^3, f^2, f^1], f^getSlice, f^0);
callback(poster)

>>> 执行上述过程

exec^1:
// 第一次进行 reduce,y 是目标逻辑片段,x 是最后一个中间件,被闭包引用,
// 闭包则作为合并结果返回,在此定义为 f^a。
y = f^0(z);
x = f^3;
f^a = f(z)->{ call( f^x(z, y) ) }
exec^2:
// 第二次进行,y 是上次处理返回的闭包(即 f^a),x 是第二个中间件,再次生成闭包返回。
y = f^a;
x = f^2;
f^b = f(z)->{ call( f^x(z, y) ) }
exec^3:
// 第三次也是最后一次合并,同第二次。现在三个数组项被合并,
// 合并结果为最后一次合并所返回的闭包。
y = f^b;
x = f^1;
f^c = f(z)->{ call( f^x(z, y) ) }
exec^4:
// 该闭包(最后一次合并结果)返回后,被调用,第一个参数为 z = poster = 1,开始执行。
// 该闭包的 z 参数即为 1,其余如 x、y 值见 exec^3。
call( f^c(0) ) = call( f^1(0, f^b) )
exec^5:
// 继续等式替换
call( f^b(0 + 1) ) = call( f^2(0 + 1, f^a) )
exec^6:
// 根据上已执行过程返回结果,已执行至中间件 2 的回调,继续等式替换
result = f^a(0 + 1); result - 1
exec^7:
result = call( f^3(0 + 1 , f^0) ); result - 1
exec^8:
result = call( f^0(0 + 1 + 2) ); result - 1
exec^9:
result = 3; result - 1

// 处理结果
result: 2

分析

根据伪代码,和执行过程,我们能了解到先前通过array_reverse反序排列的中间件,由于在本文中,此处闭包逆向传递下去的特性(因为所引用的外部参数中,是前一执行结果所返回的闭包),实际上依旧是按顺序执行的,我们在这里也看到了如何利用该特性,实现前置和后置调用的原理以及拦截的原理。

前置调用时,先处理自上传递下来的结果,随后调用下一个(由中间件构成的)闭包。后置调用时,先调用下一个(有中间件构成的)闭包,里面仍旧可能无数的引用,直到其中的目标处理逻辑,最终返回结果,再处理。

拦截的原理就更简单了,由于拦截只存在于前置中间件,而前置中间件是先处理,然后调用传递进来的闭包并返回其值,而若这个值不是来自于一个闭包调用的结果,就意味着肯定中间不存在调用关系,也就根本不会执行到闭包中的下一个中间件。

总结

以上就是整个Pipeline以及中间件的实现,我知道很多人依旧十分纠结,内心充满困惑。我仍旧建议老老实实,从 array_reduce 这个函数的实际功能着手,然后把每一步执行过程,写下来,慢慢的就明白了。这篇文章不仅仅只是Laravel组件的一个讲解,更多是从中发现PHP的一些基础概念和知识,要知道在强大的PHP框架也是用PHP写出的,本质上仍旧是在一个大的基础上构建的小世界而已。

0%