8
8
use Interop \Queue \PsrConsumer ;
9
9
use Interop \Queue \PsrContext ;
10
10
use Interop \Queue \PsrMessage ;
11
+ use Enqueue \AmqpTools \DelayStrategyAware ;
11
12
12
13
class Job extends BaseJob implements JobContract
13
14
{
@@ -41,7 +42,7 @@ public function __construct(Container $container, PsrContext $psrContext, PsrCon
41
42
$ this ->psrMessage = $ psrMessage ;
42
43
$ this ->connectionName = $ connectionName ;
43
44
}
44
-
45
+
45
46
public function getJobId ()
46
47
{
47
48
return $ this ->psrMessage ->getMessageId ();
@@ -62,16 +63,19 @@ public function delete()
62
63
*/
63
64
public function release ($ delay = 0 )
64
65
{
65
- parent ::release ($ job );
66
+ parent ::release ($ delay );
66
67
67
68
$ requeueMessage = clone $ this ->psrMessage ;
68
69
$ requeueMessage ->setProperty ('x-attempts ' , $ this ->attempts () + 1 );
69
70
70
- $ this ->psrContext ->createProducer ()
71
- ->setDeliveryDelay ($ this ->secondsUntil ($ delay ))
72
- ->send ($ this ->psrConsumer ->getQueue (), $ requeueMessage );
71
+ $ producer = $ this ->psrContext ->createProducer ();
72
+
73
+ if ($ producer instanceof DelayStrategyAware && $ producer ->delayable ()) {
74
+ $ producer ->setDeliveryDelay ($ this ->secondsUntil ($ delay ));
75
+ }
73
76
74
77
$ this ->psrConsumer ->acknowledge ($ this ->psrMessage );
78
+ $ producer ->send ($ this ->psrConsumer ->getQueue (), $ requeueMessage );
75
79
}
76
80
77
81
/**
0 commit comments