Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"require": {
"php": ">=8.3",
"php-amqplib/php-amqplib": "^3.7",
"utopia-php/di": "0.3.*",
"utopia-php/servers": "0.3.*",
"utopia-php/fetch": "0.5.*",
"utopia-php/pools": "1.*",
Expand Down
19 changes: 11 additions & 8 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

150 changes: 58 additions & 92 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
namespace Utopia\Queue;

use Exception;
use Swoole\Coroutine;
use Throwable;
use Utopia\DI\Container;
use Utopia\Servers\Hook;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
Expand All @@ -12,6 +14,7 @@

class Server
{
public const WORKER_CONTAINER_CONTEXT_KEY = '__utopia_queue_worker_container';
/**
* Queue Adapter
*
Expand Down Expand Up @@ -61,28 +64,22 @@ class Server
*/
protected array $workerStopHooks = [];

/**
* @var array
*/
protected array $resources = [
'error' => null,
];

/**
* @var array
*/
protected static array $resourcesCallbacks = [];
protected bool $coroutines = false;
Comment thread
ChiragAgg5k marked this conversation as resolved.
Outdated
protected Container $container;
protected ?Container $workerContainer = null;

private Histogram $jobWaitTime;
private Histogram $processDuration;

/**
* Creates an instance of a Queue server.
* @param Adapter $adapter
* @param Container|null $container
*/
public function __construct(Adapter $adapter)
public function __construct(Adapter $adapter, ?Container $container = null)
{
$this->adapter = $adapter;
$this->container = $container ?? new Container();
$this->setTelemetry(new NoTelemetry());
}

Expand All @@ -93,75 +90,29 @@ public function job(): Job
}

/**
* If a resource has been created return it, otherwise create it and then return it
*
* @param string $name
* @param bool $fresh
* @return mixed
* @throws Exception
*/
public function getResource(string $name, bool $fresh = false): mixed
{
if (
!\array_key_exists($name, $this->resources) ||
$fresh ||
self::$resourcesCallbacks[$name]['reset']
) {
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
throw new Exception("Failed to find resource: $name");
}

$this->resources[$name] = \call_user_func_array(
self::$resourcesCallbacks[$name]['callback'],
$this->getResources(
self::$resourcesCallbacks[$name]['injections'],
),
);
}

self::$resourcesCallbacks[$name]['reset'] = false;

return $this->resources[$name];
}

/**
* Get Resources By List
*
* @param array $list
* @return array
*/
public function getResources(array $list): array
{
$resources = [];

foreach ($list as $name) {
$resources[$name] = $this->getResource($name);
}

return $resources;
}

/**
* Set a new resource callback
* Set a new resource on the container
*
* @param string $name
* @param callable $callback
* @param array $injections
*
* @throws Exception
*
* @return void
*/
public static function setResource(
public function setResource(
string $name,
callable $callback,
array $injections = [],
): void {
self::$resourcesCallbacks[$name] = [
'callback' => $callback,
'injections' => $injections,
'reset' => true,
];
$this->container->set($name, $callback, $injections);
}

public function getContainer(): Container
{
if ($this->coroutines && \Swoole\Coroutine::getCid() !== -1) {
return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] ?? $this->container;
}

return $this->workerContainer ?? $this->container;
Comment thread
ChiragAgg5k marked this conversation as resolved.
Outdated
}
Comment thread
ChiragAgg5k marked this conversation as resolved.

public function setTelemetry(Telemetry $telemetry): void
Expand Down Expand Up @@ -237,9 +188,9 @@ public function stop(): self
try {
$this->adapter->stop();
} catch (Throwable $error) {
self::setResource('error', fn () => $error);
$this->getContainer()->set('error', fn () => $error);
foreach ($this->errorHooks as $hook) {
$hook->getAction()(...$this->getArguments($hook));
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
}
}
return $this;
Expand All @@ -264,12 +215,16 @@ public function init(): Hook
*/
public function start(): self
{
if ($this->coroutines && Coroutine::getCid() !== -1) {
Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->container);
}
Comment thread
ChiragAgg5k marked this conversation as resolved.
Outdated

try {
$this->adapter->workerStart(function (string $workerId) {
self::setResource('workerId', fn () => $workerId);
$this->getContainer()->set('workerId', fn () => $workerId);

foreach ($this->workerStartHooks as $hook) {
$hook->getAction()(...$this->getArguments($hook));
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
}

$this->adapter->consumer->consume(
Expand All @@ -281,13 +236,14 @@ function (Message $message) {
microtime(true) - $message->getTimestamp();
$this->jobWaitTime->record($waitDuration);

$this->resources = [];
self::setResource('message', fn () => $message);
$this->getContainer()->set('message', fn () => $message);

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if ($this->job->getHook()) {
foreach ($this->initHooks as $hook) {
// Global init hooks
if (\in_array('*', $hook->getGroups())) {
$arguments = $this->getArguments(
$this->getContainer(),
$hook,
$message->getPayload(),
);
Expand All @@ -301,6 +257,7 @@ function (Message $message) {
// Group init hooks
if (\in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments(
$this->getContainer(),
$hook,
$message->getPayload(),
);
Expand All @@ -312,6 +269,7 @@ function (Message $message) {
return \call_user_func_array(
$this->job->getAction(),
$this->getArguments(
$this->getContainer(),
$this->job,
$message->getPayload(),
),
Expand All @@ -323,11 +281,14 @@ function (Message $message) {
}
},
function (Message $message) {
$this->getContainer()->set('message', fn () => $message);

if ($this->job->getHook()) {
foreach ($this->shutdownHooks as $hook) {
// Global init hooks
// Global shutdown hooks
if (\in_array('*', $hook->getGroups())) {
$arguments = $this->getArguments(
$this->getContainer(),
$hook,
$message->getPayload(),
);
Expand All @@ -338,9 +299,10 @@ function (Message $message) {

foreach ($this->job->getGroups() as $group) {
foreach ($this->shutdownHooks as $hook) {
// Group init hooks
// Group shutdown hooks
if (\in_array($group, $hook->getGroups())) {
$arguments = $this->getArguments(
$this->getContainer(),
$hook,
$message->getPayload(),
);
Expand All @@ -350,23 +312,26 @@ function (Message $message) {
}
},
function (?Message $message, Throwable $th) {
self::setResource('error', fn () => $th);
$this->getContainer()->set('error', fn () => $th);
if ($message !== null) {
$this->getContainer()->set('message', fn () => $message);
}

foreach ($this->errorHooks as $hook) {
$hook->getAction()(...$this->getArguments($hook));
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
},
);
});

$this->adapter->workerStop(function (string $workerId) {
self::setResource('workerId', fn () => $workerId);
$this->getContainer()->set('workerId', fn () => $workerId);

try {
// Call user-defined workerStop hooks
foreach ($this->workerStopHooks as $hook) {
try {
$hook->getAction()(...$this->getArguments($hook));
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
} catch (Throwable $e) {
}
}
Expand All @@ -378,9 +343,9 @@ function (?Message $message, Throwable $th) {

$this->adapter->start();
} catch (Throwable $error) {
self::setResource('error', fn () => $error);
$this->getContainer()->set('error', fn () => $error);
foreach ($this->errorHooks as $hook) {
$hook->getAction()(...$this->getArguments($hook));
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
}
}
return $this;
Expand Down Expand Up @@ -431,11 +396,12 @@ public function getWorkerStop(): array
/**
* Get Arguments
*
* @param Container $container
* @param Hook $hook
* @param array $payload
* @return array
*/
protected function getArguments(Hook $hook, array $payload = []): array
protected function getArguments(Container $container, Hook $hook, array $payload = []): array
{
$arguments = [];
foreach ($hook->getParams() as $key => $param) {
Expand All @@ -444,13 +410,13 @@ protected function getArguments(Hook $hook, array $payload = []): array
$value =
$value === '' || $value === null ? $param['default'] : $value;

$this->validate($key, $param, $value);
$this->validate($key, $param, $value, $container);
$hook->setParamValue($key, $value);
$arguments[$param['order']] = $value;
}

foreach ($hook->getInjections() as $key => $injection) {
$arguments[$injection['order']] = $this->getResource(
$arguments[$injection['order']] = $container->get(
$injection['name'],
);
}
Expand All @@ -466,21 +432,21 @@ protected function getArguments(Hook $hook, array $payload = []): array
* @param string $key
* @param array $param
* @param mixed $value
* @param Container $container
*
* @throws Exception
*
* @return void
*/
protected function validate(string $key, array $param, mixed $value): void
protected function validate(string $key, array $param, mixed $value, Container $container): void
{
if ('' !== $value && $value !== null) {
$validator = $param['validator']; // checking whether the class exists

if (\is_callable($validator)) {
$validator = \call_user_func_array(
$validator,
$this->getResources($param['injections']),
);
$validatorKey = '_validator:' . $key;
$container->set($validatorKey, $validator, $param['injections']);
$validator = $container->get($validatorKey);
}

if (!$validator instanceof Validator) {
Expand Down
Loading