Skip to content

Cache suspensions #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/http-client-suspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

function fetch(string $url): string
{
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

$parsedUrl = \parse_url($url);
if (!isset($parsedUrl['host'], $parsedUrl['path'])) {
Expand Down
2 changes: 1 addition & 1 deletion examples/invalid-callback-return.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

print "Press Ctrl+C to exit..." . PHP_EOL;

$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::onSignal(\SIGINT, function (string $watcherId) use ($suspension) {
EventLoop::cancel($watcherId);
Expand Down
2 changes: 1 addition & 1 deletion examples/stdin-timeout.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

print "Write something and hit enter" . PHP_EOL;

$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

$readWatcher = EventLoop::onReadable(STDIN, function ($watcherId, $stream) use ($suspension) {
EventLoop::cancel($watcherId);
Expand Down
2 changes: 1 addition & 1 deletion examples/stdin.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

print "Write something and hit enter" . PHP_EOL;

$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::onReadable(STDIN, function ($watcherId, $stream) use ($suspension) {
EventLoop::cancel($watcherId);
Expand Down
2 changes: 1 addition & 1 deletion examples/timers.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use Revolt\EventLoop;

$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

$repeatWatcher = EventLoop::repeat(1, function () {
print "++ Executing watcher created by Loop::repeat()" . PHP_EOL;
Expand Down
24 changes: 21 additions & 3 deletions src/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Revolt;

use JetBrains\PhpStorm\Deprecated;
use Revolt\EventLoop\Driver;
use Revolt\EventLoop\DriverFactory;
use Revolt\EventLoop\Internal\AbstractDriver;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected function now(): float
* Use {@see EventLoop::defer()} if you need these features.
*
* @param \Closure $closure The callback to queue.
* @param mixed ...$args The callback arguments.
* @param mixed ...$args The callback arguments.
*/
public static function queue(\Closure $closure, mixed ...$args): void
{
Expand Down Expand Up @@ -350,13 +351,30 @@ public static function getDriver(): Driver
}

/**
* Create an object used to suspend and resume execution, either within a fiber or from {main}.
* Returns an object used to suspend and resume execution of the current fiber or {main}.
*
* Calls from the same fiber will return the same suspension object.
*
* @return Suspension
*/
public static function getSuspension(): Suspension
{
return self::getDriver()->getSuspension();
}

/**
* Returns an object used to suspend and resume execution of the current fiber or {main}.
*
* Calls from the same fiber will return the same suspension object.
*
* @return Suspension
*
* @deprecated This old name is only kept temporarily to allow smooth transitions from 0.1 to 0.2 and will be
* removed at a later point.
*/
public static function createSuspension(): Suspension
{
return self::getDriver()->createSuspension();
return self::getDriver()->getSuspension();
}

/**
Expand Down
6 changes: 4 additions & 2 deletions src/EventLoop/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ public function run(): void;
public function stop(): void;

/**
* Create an object used to suspend and resume execution, either within a fiber or from {main}.
* Returns an object used to suspend and resume execution of the current fiber or {main}.
*
* Calls from the same fiber will return the same suspension object.
*
* @return Suspension
*/
public function createSuspension(): Suspension;
public function getSuspension(): Suspension;

/**
* @return bool True if the event loop is running, false if it is stopped.
Expand Down
4 changes: 2 additions & 2 deletions src/EventLoop/Driver/TracingDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public function stop(): void
$this->driver->stop();
}

public function createSuspension(): Suspension
public function getSuspension(): Suspension
{
return $this->driver->createSuspension();
return $this->driver->getSuspension();
}

public function isRunning(): bool
Expand Down
19 changes: 15 additions & 4 deletions src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ private static function checkFiberSupport(): void
private bool $idle = false;
private bool $stopped = false;

private \WeakMap $suspensions;

public function __construct()
{
self::checkFiberSupport();

$this->suspensions = new \WeakMap();

$this->internalSuspensionMarker = new \stdClass();
$this->microtaskQueue = new \SplQueue();
$this->callbackQueue = new \SplQueue();
Expand Down Expand Up @@ -293,12 +297,19 @@ public function unreference(string $callbackId): string
return $callbackId;
}

public function createSuspension(): Suspension
public function getSuspension(): Suspension
{
// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert(\Fiber::getCurrent() !== $this->fiber);
$fiber = \Fiber::getCurrent();

return new DriverSuspension($this->runCallback, $this->queueCallback, $this->interruptCallback);
// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($fiber !== $this->fiber);

// Use current object in case of {main}
return $this->suspensions[$fiber ?? $this] ??= new DriverSuspension(
$this->runCallback,
$this->queueCallback,
$this->interruptCallback
);
}

public function setErrorHandler(?\Closure $errorHandler): ?callable
Expand Down
16 changes: 8 additions & 8 deletions test/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function testOnReadable(): void
\fwrite($ends[0], "trigger readability callback");

$count = 0;
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::onReadable($ends[1], function ($callbackId) use (&$count, $suspension): void {
$this->assertTrue(true);
Expand All @@ -49,7 +49,7 @@ public function testOnReadable(): void
public function testOnWritable(): void
{
$count = 0;
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::onWritable(STDOUT, function ($callbackId) use (&$count, $suspension): void {
$this->assertTrue(true);
Expand Down Expand Up @@ -115,7 +115,7 @@ public function testRunInFiber(): void

public function testRunAfterSuspension(): void
{
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::defer(fn () => $suspension->resume('test'));

Expand All @@ -142,7 +142,7 @@ public function testSuspensionAfterRun(): void

self::assertTrue($invoked);

$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::defer(fn () => $suspension->resume('test'));

Expand All @@ -154,7 +154,7 @@ public function testSuspensionWithinFiber(): void
$invoked = false;

EventLoop::queue(function () use (&$invoked): void {
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();

EventLoop::defer(fn () => $suspension->resume('test'));

Expand All @@ -173,7 +173,7 @@ public function testSuspensionWithinCallback(): void
$send = 42;

EventLoop::defer(static function () use (&$received, $send): void {
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();
EventLoop::defer(static fn () => $suspension->resume($send));
$received = $suspension->suspend();
});
Expand All @@ -188,7 +188,7 @@ public function testSuspensionWithinQueue(): void
$send = 42;

EventLoop::queue(static function () use (&$received, $send): void {
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();
EventLoop::defer(static fn () => $suspension->resume($send));
$received = $suspension->suspend();
});
Expand All @@ -200,7 +200,7 @@ public function testSuspensionWithinQueue(): void

public function testSuspensionThrowingErrorViaInterrupt(): void
{
$suspension = EventLoop::createSuspension();
$suspension = EventLoop::getSuspension();
$error = new \Error("Test error");
EventLoop::queue(static fn () => throw $error);
EventLoop::defer(static fn () => $suspension->resume("Value"));
Expand Down