-
Notifications
You must be signed in to change notification settings - Fork 11.7k
[12.x] Fix PendingRequest@pool() && batch() concurrency
#57973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
abadd52
e1101b1
7679976
28b52e6
ba374c0
7305fa5
8b824d4
12fdfeb
a8105c8
5911d2d
5caea0f
2355a99
a84c5e5
8e8427c
bea0372
4a74bc1
79fdf65
e1811c3
823a70c
fbc8d12
7f6e4b8
cc5a13a
a6d17ee
fcb4e84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
| use Illuminate\Http\Client\Events\ConnectionFailed; | ||
| use Illuminate\Http\Client\Events\RequestSending; | ||
| use Illuminate\Http\Client\Events\ResponseReceived; | ||
| use Illuminate\Http\Client\Promises\FluentPromise; | ||
| use Illuminate\Http\Client\Promises\LazyPromise; | ||
| use Illuminate\Support\Arr; | ||
| use Illuminate\Support\Collection; | ||
| use Illuminate\Support\Str; | ||
|
|
@@ -886,38 +888,51 @@ public function delete(string $url, $data = []) | |
| * Send a pool of asynchronous requests concurrently. | ||
| * | ||
| * @param (callable(\Illuminate\Http\Client\Pool): mixed) $callback | ||
| * @param int|null $concurrency | ||
| * @param non-negative-int|null $concurrency | ||
| * @return array<array-key, \Illuminate\Http\Client\Response|\Illuminate\Http\Client\ConnectionException|\Illuminate\Http\Client\RequestException> | ||
| */ | ||
| public function pool(callable $callback, ?int $concurrency = null) | ||
| public function pool(callable $callback, ?int $concurrency = 0) | ||
| { | ||
| $results = []; | ||
|
|
||
| $requests = tap(new Pool($this->factory), $callback)->getRequests(); | ||
|
|
||
| if ($concurrency === null) { | ||
| (new Collection($requests))->each(function ($item) { | ||
| if ($item instanceof static) { | ||
| $item = $item->getPromise(); | ||
| } | ||
| if ($item instanceof LazyPromise) { | ||
| $item->buildPromise(); | ||
| } | ||
| }); | ||
| foreach ($requests as $key => $item) { | ||
| $results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait(); | ||
| } | ||
|
|
||
| return $results; | ||
| } | ||
|
Comment on lines
900
to
916
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this for backwards compatibility: so I would like to see this entirely removed in Laravel 13. To me, it makes no sense in the context of request pooling, and it changes the underlying behavior. Here, we throw an exception on a connection timeout, whereas in any other case, we return the ConnectionException as a part of the array. |
||
|
|
||
| $promises = []; | ||
| $concurrency = $concurrency === 0 ? count($requests) : $concurrency; | ||
|
|
||
| foreach ($requests as $key => $item) { | ||
| $promises[$key] = $item instanceof static ? $item->getPromise() : $item; | ||
| } | ||
| (new Collection($requests))->chunk($concurrency) | ||
| ->each(static function (Collection $requests) use ($concurrency, &$results) { | ||
| $promises = []; | ||
| foreach ($requests as $key => $item) { | ||
| $promise = $item instanceof static ? $item->getPromise() : $item; | ||
| $promises[$key] = $promise instanceof LazyPromise ? $promise->buildPromise() : $promise; | ||
| } | ||
|
|
||
| (new EachPromise($promises, [ | ||
| 'fulfilled' => function ($result, $key) use (&$results) { | ||
| $results[$key] = $result; | ||
| }, | ||
| 'rejected' => function ($reason, $key) use (&$results) { | ||
| $results[$key] = $reason; | ||
| }, | ||
| 'concurrency' => $concurrency, | ||
| ]))->promise()->wait(); | ||
| (new EachPromise($promises, [ | ||
| 'fulfilled' => function ($result, $key) use (&$results) { | ||
| $results[$key] = $result; | ||
| }, | ||
| 'rejected' => function ($reason, $key) use (&$results) { | ||
| $results[$key] = $reason; | ||
| }, | ||
| 'concurrency' => $concurrency, | ||
| ]))->promise()->wait(); | ||
| }); | ||
|
|
||
| return $results; | ||
| } | ||
|
|
@@ -939,7 +954,7 @@ public function batch(callable $callback): Batch | |
| * @param string $method | ||
| * @param string $url | ||
| * @param array $options | ||
| * @return \Illuminate\Http\Client\Response | ||
| * @return \Illuminate\Http\Client\Response|\Illuminate\Http\Client\Promises\LazyPromise | ||
| * | ||
| * @throws \Exception | ||
| * @throws \Illuminate\Http\Client\ConnectionException | ||
|
|
@@ -957,7 +972,9 @@ public function send(string $method, string $url, array $options = []) | |
| [$this->pendingBody, $this->pendingFiles] = [null, []]; | ||
|
|
||
| if ($this->async) { | ||
| return $this->makePromise($method, $url, $options); | ||
| return $this->promise = new LazyPromise( | ||
| fn () => $this->makePromise($method, $url, $options) | ||
| ); | ||
| } | ||
|
|
||
| $shouldRetry = null; | ||
|
|
@@ -1198,7 +1215,7 @@ protected function handlePromiseResponse(Response|ConnectionException|TransferEx | |
| * @param string $method | ||
| * @param string $url | ||
| * @param array $options | ||
| * @return \Psr\Http\Message\MessageInterface|\Illuminate\Http\Client\FluentPromise | ||
| * @return \Psr\Http\Message\MessageInterface|\GuzzleHttp\Promise\PromiseInterface | ||
| * | ||
| * @throws \Exception | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| <?php | ||
|
|
||
| namespace Illuminate\Http\Client\Promises; | ||
|
|
||
| use Closure; | ||
| use GuzzleHttp\Promise\PromiseInterface; | ||
| use RuntimeException; | ||
|
|
||
| class LazyPromise implements PromiseInterface | ||
| { | ||
| /** | ||
| * The callbacks to execute after the Guzzle Promise has been built. | ||
| * | ||
| * @var list<callable> | ||
| */ | ||
| protected array $pending = []; | ||
|
|
||
| /** | ||
| * The promise built by the creator. | ||
| * | ||
| * @var \GuzzleHttp\Promise\PromiseInterface | ||
| */ | ||
| protected PromiseInterface $guzzlePromise; | ||
|
|
||
| /** | ||
| * Create a new lazy promise instance. | ||
| * | ||
| * @param (\Closure(): \GuzzleHttp\Promise\PromiseInterface) $promiseBuilder The callback to build a new PromiseInterface. | ||
| */ | ||
| public function __construct(protected Closure $promiseBuilder) | ||
| { | ||
| } | ||
|
|
||
| /** | ||
| * Build the promise from the promise builder. | ||
| * | ||
| * @return \GuzzleHttp\Promise\PromiseInterface | ||
| * | ||
| * @throws \RuntimeException If the promise has already been built | ||
| */ | ||
| public function buildPromise(): PromiseInterface | ||
| { | ||
| if (! $this->promiseNeedsBuilt()) { | ||
| throw new RuntimeException('Promise already built'); | ||
| } | ||
|
|
||
| $this->guzzlePromise = call_user_func($this->promiseBuilder); | ||
|
|
||
| foreach ($this->pending as $pendingCallback) { | ||
| $pendingCallback($this->guzzlePromise); | ||
cosmastech marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| return $this->guzzlePromise; | ||
| } | ||
|
|
||
| /** | ||
| * If the promise has been created from the promise builder. | ||
| * | ||
| * @return bool | ||
| */ | ||
| public function promiseNeedsBuilt(): bool | ||
| { | ||
| return ! isset($this->guzzlePromise); | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function then(?callable $onFulfilled = null, ?callable $onRejected = null): PromiseInterface | ||
| { | ||
| $this->pending[] = static fn (PromiseInterface $promise) => $promise->then($onFulfilled, $onRejected); | ||
|
|
||
| return $this; | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function otherwise(callable $onRejected): PromiseInterface | ||
| { | ||
| $this->pending[] = static fn (PromiseInterface $promise) => $promise->otherwise($onRejected); | ||
|
|
||
| return $this; | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function getState(): string | ||
| { | ||
| if ($this->promiseNeedsBuilt()) { | ||
| return PromiseInterface::PENDING; | ||
| } | ||
|
|
||
| return $this->guzzlePromise->getState(); | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function resolve($value): void | ||
| { | ||
| throw new \LogicException('Cannot resolve a lazy promise.'); | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function reject($reason): void | ||
| { | ||
| throw new \LogicException('Cannot reject a lazy promise.'); | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function cancel(): void | ||
| { | ||
| throw new \LogicException('Cannot cancel a lazy promise.'); | ||
| } | ||
|
|
||
| #[\Override] | ||
| public function wait(bool $unwrap = true) | ||
| { | ||
| if ($this->promiseNeedsBuilt()) { | ||
| $this->buildPromise(); | ||
| } | ||
|
|
||
| return $this->guzzlePromise->wait($unwrap); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.