PHP管道插件League\Pipeline

思考并回答以下问题:

  • 责任链模式是什么样子的?属于什么类型模式?

Pipeline设计模式

水管太长,只要有一处破了,就会漏水了,而且不利于复杂环境弯曲转折使用。所以我们都会把水管分成很短的一节一节管道,然后最大化的让管道大小作用不同,因地制宜,组装在一起,满足各种各样的不同需求。

由此得出Pipeline的设计模式,就是将复杂冗长的流程 (processes) 截成各个小流程,小任务。每个最小量化的任务就可以复用,通过组装不同的小任务,构成复杂多样的流程(processes)。

最后将「输入」引入管道,根据每个小任务对输入进行操作(加工、过滤),最后输出满足需要的结果。

Illuminate\Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public function demo(Request $request)
{
$pipe1 = function ($payload, Closure $next) {
$payload = $payload + 1;
return $next($payload);
};

$pipe2 = function ($payload, Closure $next) {
$payload = $payload * 3;
return $next($payload);
};

$data = $request->input('data', 0);

$pipeline = new Pipeline();

return $pipeline
->send($data)
->through([$pipe1, $pipe2])
->then(function ($data) {
return $data;
});
}

League\Pipeline

安装插件

1
composer require league/pipeline

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use League\Pipeline\Pipeline;

// 创建两个闭包函数
$pipe1 = function ($payload) {
return $payload + 1;
};

$pipe2 = function ($payload) {
return $payload * 3;
};

$route->map(
'GET',
'/demo',
function (ServerRequestInterface $request, ResponseInterface $response
) use ($service, $pipe1, $pipe2) {
$params = $request->getQueryParams();

// 正常使用
$pipeline1 = (new Pipeline)
->pipe($pipe1)
->pipe($pipe2);

$callback1 = $pipeline1->process($params['data']);

$response->getBody()->write("<h1>正常使用</h1>");
$response->getBody()->write("<p>结果:$callback1</p>");

// 使用魔术方法
$pipeline2 = (new Pipeline())
->pipe($pipe1)
->pipe($pipe2);

$callback2 = $pipeline2($params['data']);

$response->getBody()->write("<h1>使用魔术方法</h1>");
$response->getBody()->write("<p>结果:$callback2</p>");

// 使用 Builder
$builder = new PipelineBuilder();
$pipeline3 = $builder
->add($pipe1)
->add($pipe2)
->build();

$callback3 = $pipeline3($params['data']);

$response->getBody()->write("<h1>使用 Builder</h1>");
$response->getBody()->write("<p>结果:$callback3</p>");
return $response;
}
);

运行结果

解读源代码

整个插件就这几个文件:

PipelineInterface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
declare(strict_types=1);

namespace League\Pipeline;

interface PipelineInterface extends StageInterface
{
/**
* Create a new pipeline with an appended stage.
*
* @return static
*/
public function pipe(callable $operation): PipelineInterface;
}

interface StageInterface
{
/**
* Process the payload.
*
* @param mixed $payload
*
* @return mixed
*/
public function __invoke($payload);
}

该接口主要是利用链式编程的思想,不断添加管道「pipe」,然后增加一个魔术方法,来让传入的参数运转起来。

先看看这个魔术方法的作用:

mixed __invoke ([ $… ] )
当尝试以调用函数的方式调用一个对象时,__invoke () 方法会被自动调用。

如:

1
2
3
4
5
6
7
8
9
10
11
<?php
class CallableClass
{
function __invoke($x) {
var_dump($x);
}
}
$obj = new CallableClass;
$obj(5);
var_dump(is_callable($obj));
?>

返回结果:

1
2
3
int(5)
bool(true)
Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php
declare(strict_types=1);

namespace League\Pipeline;

class Pipeline implements PipelineInterface
{
/**
* @var callable[]
*/
private $stages = [];

/**
* @var ProcessorInterface
*/
private $processor;

public function __construct(ProcessorInterface $processor = null, callable ...$stages)
{
$this->processor = $processor ?? new FingersCrossedProcessor;
$this->stages = $stages;
}

public function pipe(callable $stage): PipelineInterface
{
$pipeline = clone $this;
$pipeline->stages[] = $stage;

return $pipeline;
}

public function process($payload)
{
return $this->processor->process($payload, ...$this->stages);
}

public function __invoke($payload)
{
return $this->process($payload);
}
}

其中核心类 Pipeline 的作用主要就是两个:

添加组装各个管道「pipe」;
组装后,引水流动,执行 process ($payload),输出结果。
Processor

接好各种管道后,那就要「引水入渠」了。该插件提供了两个基础执行类,比较简单,直接看代码就能懂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 按照$stages数组顺利,遍历执行管道方法,再将结果传入下一个管道,让「水」一层层「流动」起来
class FingersCrossedProcessor implements ProcessorInterface
{
public function process($payload, callable ...$stages)
{
foreach ($stages as $stage) {
$payload = $stage($payload);
}

return $payload;
}
}

// 增加一个额外的「过滤网」,经过每个管道后的结果,都需要check,一旦满足则终止,直接输出结果。
class InterruptibleProcessor implements ProcessorInterface
{
/**
* @var callable
*/
private $check;

public function __construct(callable $check)
{
$this->check = $check;
}

public function process($payload, callable ...$stages)
{
$check = $this->check;

foreach ($stages as $stage) {
$payload = $stage($payload);

if (true !== $check($payload)) {
return $payload;
}
}

return $payload;
}
}

interface ProcessorInterface
{
/**
* Process the payload using multiple stages.
*
* @param mixed $payload
*
* @return mixed
*/
public function process($payload, callable ...$stages);
}

我们完全也可以利用该接口,实现我们的方法来组装管道和「过滤网」。

PipelineBuilder

最后提供了一个Builder,这个也很好理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class PipelineBuilder implements PipelineBuilderInterface
{
/**
* @var callable[]
*/
private $stages = [];

/**
* @return self
*/
public function add(callable $stage): PipelineBuilderInterface
{
$this->stages[] = $stage;

return $this;
}

public function build(ProcessorInterface $processor = null): PipelineInterface
{
return new Pipeline($processor, ...$this->stages);
}
}

interface PipelineBuilderInterface
{
/**
* Add an stage.
*
* @return self
*/
public function add(callable $stage): PipelineBuilderInterface;

/**
* Build a new Pipeline object.
*/
public function build(ProcessorInterface $processor = null): PipelineInterface;
}

0%