Skip to content

[amqp] Fix socket and signal issue. #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

use Bunny\Channel;
use Bunny\Client;
use Bunny\Exception\ClientException;
use Bunny\Message;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand Down Expand Up @@ -53,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
*/
private $subscribers;

/**
* @var SignalSocketHelper
*/
private $signalSocketHandler;

/**
* Callable must return instance of \Bunny\Channel once called.
*
Expand All @@ -78,6 +85,7 @@ public function __construct($bunnyChannel, $config = [])

$this->buffer = new Buffer();
$this->subscribers = [];
$this->signalSocketHandler = new SignalSocketHelper();
}

/**
Expand Down Expand Up @@ -388,7 +396,19 @@ public function consume($timeout = 0)
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
$this->signalSocketHandler->beforeSocket();

try {
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
} catch (ClientException $e) {
if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$this->signalSocketHandler->afterSocket();
}
}

/**
Expand Down
18 changes: 18 additions & 0 deletions pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\SignalSocketHelper;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
Expand All @@ -20,6 +21,7 @@
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
Expand Down Expand Up @@ -55,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
*/
private $subscribers;

/**
* @var SignalSocketHelper
*/
private $signalSocketHandler;

/**
* @param AbstractConnection $connection
* @param array $config
Expand All @@ -71,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = [])
$this->connection = $connection;
$this->buffer = new Buffer();
$this->subscribers = [];
$this->signalSocketHandler = new SignalSocketHelper();
}

/**
Expand Down Expand Up @@ -382,6 +390,8 @@ public function consume($timeout = 0)
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
}

$this->signalSocketHandler->beforeSocket();

try {
while (true) {
$start = microtime(true);
Expand All @@ -402,6 +412,14 @@ public function consume($timeout = 0)
}
} catch (AMQPTimeoutException $e) {
} catch (StopBasicConsumptionException $e) {
} catch (AMQPIOWaitException $e) {
if ($this->signalSocketHandler->wasThereSignal()) {
return;
}

throw $e;
} finally {
$this->signalSocketHandler->afterSocket();
}
}

Expand Down
83 changes: 83 additions & 0 deletions pkg/amqp-tools/SignalSocketHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace Enqueue\AmqpTools;

class SignalSocketHelper
{
/**
* @var callable[]
*/
private $handlers;

/**
* @var bool
*/
private $wasThereSignal;

/**
* @var int[]
*/
private $signals = [SIGTERM, SIGQUIT, SIGINT];

public function __construct()
{
$this->handlers = [];
}

public function beforeSocket()
{
// PHP 7.1 and higher
if (false == function_exists('pcntl_signal_get_handler')) {
return;
}

if ($this->handlers) {
throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.');
}
if (null !== $this->wasThereSignal) {
throw new \LogicException('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
}

$this->wasThereSignal = false;

foreach ($this->signals as $signal) {
/** @var callable $handler */
$handler = pcntl_signal_get_handler($signal);

pcntl_signal($signal, function ($signal) use ($handler) {
var_dump('fuckk!');
$this->wasThereSignal = true;

$handler && $handler($signal);
});

$handler && $this->handlers[$signal] = $handler;
}
}

public function afterSocket()
{
// PHP 7.1 and higher
if (false == function_exists('pcntl_signal_get_handler')) {
return;
}

$this->wasThereSignal = null;

foreach ($this->signals as $signal) {
$handler = isset($this->handlers[$signal]) ? $this->handlers[$signal] : SIG_DFL;

pcntl_signal($signal, $handler);
}

$this->handlers = [];
}

/**
* @return bool
*/
public function wasThereSignal()
{
return (bool) $this->wasThereSignal;
}
}
124 changes: 124 additions & 0 deletions pkg/amqp-tools/Tests/SignalSocketHelperTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Enqueue\AmqpTools\Tests;

use Enqueue\AmqpTools\SignalSocketHelper;
use PHPUnit\Framework\TestCase;

class SignalSocketHelperTest extends TestCase
{
/**
* @var SignalSocketHelper
*/
private $signalHelper;

private $backupSigTermHandler;

private $backupSigIntHandler;

public function setUp()
{
parent::setUp();

if (false == function_exists('pcntl_signal_get_handler')) {
$this->markTestSkipped('PHP 7.1 and higher');
}

$this->backupSigTermHandler = pcntl_signal_get_handler(SIGTERM);
$this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT);

pcntl_signal(SIGTERM, SIG_DFL);
pcntl_signal(SIGINT, SIG_DFL);

$this->signalHelper = new SignalSocketHelper();
}

public function tearDown()
{
parent::tearDown();

if ($this->signalHelper) {
$this->signalHelper->afterSocket();
}

if ($this->backupSigTermHandler) {
pcntl_signal(SIGTERM, $this->backupSigTermHandler);
}

if ($this->backupSigIntHandler) {
pcntl_signal(SIGINT, $this->backupSigIntHandler);
}
}

public function testShouldReturnFalseByDefault()
{
$this->assertFalse($this->signalHelper->wasThereSignal());
}

public function testShouldRegisterHandlerOnBeforeSocket()
{
$this->signalHelper->beforeSocket();

$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
}

public function testShouldRegisterHandlerOnBeforeSocketAndBackupCurrentOne()
{
$handler = function () {};

pcntl_signal(SIGTERM, $handler);

$this->signalHelper->beforeSocket();

$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);

$handlers = $this->readAttribute($this->signalHelper, 'handlers');

$this->assertInternalType('array', $handlers);
$this->assertArrayHasKey(SIGTERM, $handlers);
$this->assertSame($handler, $handlers[SIGTERM]);
}

public function testRestoreDefaultPropertiesOnAfterSocket()
{
$this->signalHelper->beforeSocket();
$this->signalHelper->afterSocket();

$this->assertAttributeSame(null, 'wasThereSignal', $this->signalHelper);
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
}

public function testRestorePreviousHandlerOnAfterSocket()
{
$handler = function () {};

pcntl_signal(SIGTERM, $handler);

$this->signalHelper->beforeSocket();
$this->signalHelper->afterSocket();

$this->assertSame($handler, pcntl_signal_get_handler(SIGTERM));
}

public function testThrowsIfBeforeSocketCalledSecondTime()
{
$this->signalHelper->beforeSocket();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
$this->signalHelper->beforeSocket();
}

public function testShouldReturnTrueOnWasThereSignal()
{
$this->signalHelper->beforeSocket();

posix_kill(getmypid(), SIGINT);
pcntl_signal_dispatch();

$this->assertTrue($this->signalHelper->wasThereSignal());

$this->signalHelper->afterSocket();
}
}
17 changes: 14 additions & 3 deletions pkg/enqueue/Consumption/Extension/SignalExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public function onStart(Context $context)
throw new LogicException('The pcntl extension is required in order to catch signals.');
}

if (function_exists('pcntl_async_signals')) {
pcntl_async_signals(true);
}

pcntl_signal(SIGTERM, [$this, 'handleSignal']);
pcntl_signal(SIGQUIT, [$this, 'handleSignal']);
pcntl_signal(SIGINT, [$this, 'handleSignal']);
Expand All @@ -45,7 +49,7 @@ public function onBeforeReceive(Context $context)
{
$this->logger = $context->getLogger();

pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand All @@ -63,7 +67,7 @@ public function onPreReceived(Context $context)
*/
public function onPostReceived(Context $context)
{
pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand All @@ -73,7 +77,7 @@ public function onPostReceived(Context $context)
*/
public function onIdle(Context $context)
{
pcntl_signal_dispatch();
$this->dispatchSignal();

$this->interruptExecutionIfNeeded($context);
}
Expand Down Expand Up @@ -117,4 +121,11 @@ public function handleSignal($signal)
break;
}
}

private function dispatchSignal()
{
if (false == function_exists('pcntl_async_signals')) {
pcntl_signal_dispatch();
}
}
}
6 changes: 4 additions & 2 deletions pkg/enqueue/Symfony/AmqpTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.');
}

if (isset($v['driver']) && false == in_array($v['driver'], $drivers, true)) {
throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', $drivers)));
if ($v && false == in_array($v, $drivers, true)) {
throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers)));
}

return $v;
})
->end()
->end()
Expand Down
Loading