5
5
namespace Enqueue \Dbal ;
6
6
7
7
use Doctrine \DBAL \Connection ;
8
+ use Doctrine \DBAL \Exception \DeadlockException ;
8
9
use Doctrine \DBAL \ParameterType ;
9
10
use Doctrine \DBAL \Types \Type ;
10
11
use Ramsey \Uuid \Uuid ;
@@ -73,8 +74,9 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
73
74
->fetch ()
74
75
;
75
76
77
+ // the message has been removed by a 3rd party, such as truncate operation.
76
78
if (false == $ deliveredMessage ) {
77
- throw new \ LogicException ( ' There must be a message at all times at this stage but there is no a message. ' ) ;
79
+ continue ;
78
80
}
79
81
80
82
if ($ deliveredMessage ['redelivered ' ] || empty ($ deliveredMessage ['time_to_live ' ]) || $ deliveredMessage ['time_to_live ' ] > time ()) {
@@ -105,9 +107,13 @@ protected function redeliverMessages(): void
105
107
->setParameter ('redelivered ' , true , Type::BOOLEAN )
106
108
;
107
109
108
- $ update ->execute ();
110
+ try {
111
+ $ update ->execute ();
109
112
110
- $ this ->redeliverMessagesLastExecutedAt = microtime (true );
113
+ $ this ->redeliverMessagesLastExecutedAt = microtime (true );
114
+ } catch (DeadlockException $ e ) {
115
+ // maybe next time we'll get more luck
116
+ }
111
117
}
112
118
113
119
protected function removeExpiredMessages (): void
@@ -127,7 +133,11 @@ protected function removeExpiredMessages(): void
127
133
->setParameter (':now ' , time (), Type::BIGINT )
128
134
;
129
135
130
- $ delete ->execute ();
136
+ try {
137
+ $ delete ->execute ();
138
+ } catch (DeadlockException $ e ) {
139
+ // maybe next time we'll get more luck
140
+ }
131
141
132
142
$ this ->removeExpiredMessagesLastExecutedAt = microtime (true );
133
143
}
0 commit comments