PHP-Promise

思考并回答以下问题:

  • “承诺将来会执行”的对象称为Promise对象。所谓Promise,就是一个对象,用来传递异步操作的消息。它代表了某个未来才会知道结果的事件(通常是一个异步操作),并且这个事件提供统一的API,可供进一步处理。
  • Promise最大的好处是在异步执行的流程中,把执行代码和处理结果的代码清晰地分离了

CallbackPromise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?php
namespace Prophecy\Promise;

use Prophecy\Prophecy\ObjectProphecy;
use Prophecy\Prophecy\MethodProphecy;
use Prophecy\Exception\InvalidArgumentException;
use Closure;

/**
* Callback promise.
*
* @author Konstantin Kudryashov <ever.zet@gmail.com>
*/
class CallbackPromise implements PromiseInterface
{
private $callback;

/**
* Initializes callback promise.
*
* @param callable $callback Custom callback
*
* @throws \Prophecy\Exception\InvalidArgumentException
*/
public function __construct($callback)
{
if (!is_callable($callback)) {
throw new InvalidArgumentException(sprintf(
'Callable expected as an argument to CallbackPromise, but got %s.',
gettype($callback)
));
}

$this->callback = $callback;
}

/**
* Evaluates promise callback.
*
* @param array $args
* @param ObjectProphecy $object
* @param MethodProphecy $method
*
* @return mixed
*/
public function execute(array $args, ObjectProphecy $object, MethodProphecy $method)
{
$callback = $this->callback;

if ($callback instanceof Closure && method_exists('Closure', 'bind')) {
$callback = Closure::bind($callback, $object);
}

return call_user_func($callback, $args, $object, $method);
}
}

PredisCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
<?php

namespace Doctrine\Common\Cache;

use Predis\ClientInterface;
use function array_combine;
use function array_filter;
use function array_map;
use function call_user_func_array;
use function serialize;
use function unserialize;

/**
* Predis cache provider.
*/
class PredisCache extends CacheProvider
{
/** @var ClientInterface */
private $client;

public function __construct(ClientInterface $client)
{
$this->client = $client;
}

/**
* {@inheritdoc}
*/
protected function doFetch($id)
{
$result = $this->client->get($id);
if ($result === null) {
return false;
}

return unserialize($result);
}

/**
* {@inheritdoc}
*/
protected function doFetchMultiple(array $keys)
{
$fetchedItems = call_user_func_array([$this->client, 'mget'], $keys);

return array_map('unserialize', array_filter(array_combine($keys, $fetchedItems)));
}

/**
* {@inheritdoc}
*/
protected function doSaveMultiple(array $keysAndValues, $lifetime = 0)
{
if ($lifetime) {
$success = true;

// Keys have lifetime, use SETEX for each of them
foreach ($keysAndValues as $key => $value) {
$response = (string) $this->client->setex($key, $lifetime, serialize($value));

if ($response == 'OK') {
continue;
}

$success = false;
}

return $success;
}

// No lifetime, use MSET
$response = $this->client->mset(array_map(static function ($value) {
return serialize($value);
}, $keysAndValues));

return (string) $response == 'OK';
}

/**
* {@inheritdoc}
*/
protected function doContains($id)
{
return (bool) $this->client->exists($id);
}

/**
* {@inheritdoc}
*/
protected function doSave($id, $data, $lifeTime = 0)
{
$data = serialize($data);
if ($lifeTime > 0) {
$response = $this->client->setex($id, $lifeTime, $data);
} else {
$response = $this->client->set($id, $data);
}

return $response === true || $response == 'OK';
}

/**
* {@inheritdoc}
*/
protected function doDelete($id)
{
return $this->client->del($id) >= 0;
}

/**
* {@inheritdoc}
*/
protected function doDeleteMultiple(array $keys)
{
return $this->client->del($keys) >= 0;
}

/**
* {@inheritdoc}
*/
protected function doFlush()
{
$response = $this->client->flushdb();

return $response === true || $response == 'OK';
}

/**
* {@inheritdoc}
*/
protected function doGetStats()
{
$info = $this->client->info();

return [
Cache::STATS_HITS => $info['Stats']['keyspace_hits'],
Cache::STATS_MISSES => $info['Stats']['keyspace_misses'],
Cache::STATS_UPTIME => $info['Server']['uptime_in_seconds'],
Cache::STATS_MEMORY_USAGE => $info['Memory']['used_memory'],
Cache::STATS_MEMORY_AVAILABLE => false,
];
}
}

Promise

1
2
3
4
5
6
7
8
promise->then
primise->catch
Promise::all
Promise::race
Promise::resolve
Promise::reject
Promise::warp
Promise::co

Promise.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
/**********************************************************\
* *
* ccp/Promise.php *
* *
* Promise Main Class *
* *
* Author: Cai wenhui <471113744@qq.com> *
* *
* \**********************************************************/

namespace ccp;

class Promise
{

// 默认态
CONST PENDING = 0;

// 完成态
CONST FULFILLED = 1;

// 失败态
CONST REJECTED = 2;

public $subscribers = [];

// 当前对象状态
public $state = self::PENDING;

// 当前对象的值
public $value = null;

// 当前对象Reject的理由
public $reason = null;

// 解决
private $resolve = null;

// 拒绝
private $reject = null;

/**
* Promise constructor.
*
* @param null $computation
* function($resolve,$reject){...}
*/
public function __construct($computation = null)
{
$this->resolve = new Resolve($this);
$this->reject = new Reject($this);
if (is_callable($computation)) {
$computation(...[$this->resolve, $this->reject]);
}

return $this;
}

/**
* Resolve入口
* @param $fnFulfill
* @param promise $next
* @param $x
*/
public function privateResolve($fnFulfill, $next, $x)
{
if (is_callable($fnFulfill)) {
$this->privateCall($fnFulfill, $next, $x);
} else {
call_user_func($next->resolve, $x);
}
}

/**
* Reject入口
* @param $fnReject
* @param Promise $next
* @param $e
*/
public function privateReject($fnReject, $next, $e)
{
if (is_callable($fnReject)) {
$this->privateCall($fnReject, $next, $e);
} else {
call_user_func($next->reject, $e);
}
}

/**
* 实际调用
* @param $callback
* @param Promise $next
* @param $x
*/
private function privateCall($callback, $next, $x)
{
try {
$r = $callback(...[$x]);
call_user_func($next->resolve, $r);
} catch (\Exception $e) {
call_user_func($next->reject, $e);
} catch (\Error $e) {
call_user_func($next->reject, $e);
}
}

/**
* 注册函数
*
* @param null|\Closure $fnFulfill 当前Promise对象执行成功的时候的回调函数
* @param null|\Closure $fnReject 当前Promise对象执行失败的时候的回调函数
*
* @return Promise 新的一个Promise对象
*/
public function then($fnFulfill/*Success callback*/, $fnReject = null/*Fail callback*/)
{
if (!is_callable($fnFulfill)) {
$fnFulfill = null;
}
if (!is_callable($fnReject)) {
$fnReject = null;
}
$promise = new Promise();
if ($this->state === self::FULFILLED) {
$this->privateResolve($fnFulfill, $promise, $this->value);
} elseif ($this->state === self::REJECTED) {
$this->privateReject($fnReject, $promise, $this->reason);
} else {
array_push($this->subscribers, array(
'fnFulfill' => $fnFulfill,
'fnReject' => $fnReject,
'next' => $promise
));
}

return $promise;
}

/**
* 等价于then(null,$fnReject)
*
* @param $fnReject
*
* @return Promise
*/
public function catch($fnReject)
{
return $this->then(null, $fnReject);
}

/**
* =============================================静态方法区分==============================================
*/

/**
* 等价于如下代码:
* new Promise(function(resolve){
* resolve(5)
* })
*
* @return Promise
*/
public static function resolve($value)
{
return new Promise(function ($resolve) use ($value) {
$resolve($value);
});
}

/**
* 等价于如下代码:
* new Promise(function(null,reject){
* reject(5)
* })
*
* @return Promise
*/
public static function reject($value)
{
return new Promise(function ($resolve = null, $reject) use ($value) {
$reject($value);
});
}


/**
* 批量处理Promise
*
* @param $promiseList
*
* @return Promise
*/
public static function all($promiseList)
{
$mainPromise = null;

self::toPromise($promiseList)->then(function ($list) use (&$mainPromise) {
$result = [];
$break = false;
foreach ($list as $index => $promise) {
if ($break) break;
self::toPromise($promise)->then(function ($value) use ($index, &$result) {
$result[$index] = $value;
}, function ($value) use (&$break, &$mainPromise) {
$break = true;
$mainPromise = self::reject($value);
});
}
if (!$break)
$mainPromise = self::resolve($result);
});

return $mainPromise;
}

/**
* 批量处理Promise,当最早的对象改变时停止(可以理解为虽然是批量,但是有first AND once的特点)
*
* @param $promiseList
*
* @return Promise
*/
public static function race($promiseList)
{
$mainPromise = null;

self::toPromise($promiseList)->then(function ($list) use (&$mainPromise) {
$result = [];
$break = false;
foreach ($list as $index => $promise) {
if ($break) break;
$break = true;
self::toPromise($promise)->then(function ($value) use ($index, &$result, &$mainPromise) {
$mainPromise = self::resolve($value);
}, function ($value) use (&$break, &$mainPromise) {
$mainPromise = self::reject($value);
});
}
});

return $mainPromise;
}

/*============================================= 非Promise协议标准的辅助静态方法 ===================================*/

/**
* 判断是否是Promise对象
*
* @param $obj
*
* @return bool
*/
public static function isPromise($obj)
{
return $obj instanceof Promise;
}

/**
* 把所有数据都进行Promise化
*
* @param $obj
*
* @return Promise
*/
public static function toPromise($obj)
{
if (self::isPromise($obj)) {
return $obj;
}
if ($obj instanceof \Generator) {
return self::co($obj);
}

return self::resolve($obj);
}

/**
* Promisory 生产一个将会生产Promise的函数(某种意义上,一个Promise生产函数可以被看做一个“Promise工厂“)
*/
public static function warp()
{

}

/*=============================================== 用同步的写法写异步代码 ============================================*/

/**
* Promise + 协程调度 (此用法非标准Promise用法,因为标准的Promise只进行一次解析)
*
* @param $generator
*
* @return Promise
*/
public static function co($generator)
{
if (is_callable($generator)) {
$args = array_slice(func_get_args(), 1);
$generator = call_user_func_array($generator, $args);
}

if (!($generator instanceof \Generator)) {
return self::toPromise($generator);
}

$promise = new Promise();

// 递归fnFulfill回调函数
$fnFulfill = function ($value) use (&$fnFulfill, &$fnReject, $generator, $promise) {
try {
$next = $generator->send($value);
if ($generator->valid()) {
self::toPromise($next)->then($fnFulfill, $fnReject);
} else {
if (method_exists($generator, "getReturn")) {
$ret = $generator->getReturn();
call_user_func($promise->resolve, $ret);
} else {
call_user_func($promise->resolve, $value);
}
}
} catch (\Exception $e) {
call_user_func($promise->reject, $e);
} catch (\Error $e) {
call_user_func($promise->reject, $e);
}
};

// 递归fnRejected回调函数
$fnReject = function ($err) use (&$fnFulfill, $generator, $promise) {
try {
$fnFulfill($generator->throw($err));
} catch (\Exception $e) {
call_user_func($promise->reject, $e);
} catch (\Error $e) {
call_user_func($promise->reject, $e);
}
};

// 开始Promise协程调度
self::toPromise($generator->current())->then($fnFulfill, $fnReject);

return $promise;
}
}

Reject.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?php

/**********************************************************\
* *
* ccp/Reject.php *
* *
* The Reject state class *
* *
* Author: Cai wenhui <471113744@qq.com> *
* *
\**********************************************************/

namespace ccp;

class Reject
{
private $_promise = null;

public function __construct(Promise $promise)
{
$this->_promise = $promise;
}

function __invoke($reason)
{
if ($reason === $this->_promise)
{
\call_user_func(new Reject($this->_promise), 'Conflict, solve itself');

return;
}

if ($this->_promise->state === Promise::PENDING)
{
$this->_promise->state = Promise::REJECTED;
$this->_promise->reason = $reason;
while (count($this->_promise->subscribers) > 0)
{
$subscriber = array_shift($this->_promise->subscribers);
$this->_promise->privateReject(
$subscriber['fnReject'],
$subscriber['next'],
$reason);
}
}
}
}

Resolve.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**********************************************************\
* *
* ccp/Resolve.php *
* *
* The Resolve state class *
* *
* Author: Cai wenhui <471113744@qq.com> *
* *
* \**********************************************************/

namespace ccp;

class Resolve
{
private $_promise = null;

public function __construct(Promise $promise)
{
$this->_promise = $promise;
}

function __invoke($value)
{
if ($value === $this->_promise) {
\call_user_func(new Reject($this->_promise), 'Conflict, solve itself');

return;
}

if ($this->_promise->state === Promise::PENDING) {
$this->_promise->state = Promise::FULFILLED;
$this->_promise->value = $value;
while (count($this->_promise->subscribers) > 0) {
$subscriber = array_shift($this->_promise->subscribers);
$this->_promise->privateResolve(
$subscriber['fnFulfill'],
$subscriber['next'],
$value);
}
}
}
}
0%