File tree Expand file tree Collapse file tree 3 files changed +25
-8
lines changed Expand file tree Collapse file tree 3 files changed +25
-8
lines changed Original file line number Diff line number Diff line change 14
14
final class Adapter implements AdapterInterface
15
15
{
16
16
public function __construct (
17
- private QueueProviderInterface $ provider ,
17
+ private QueueProviderInterface $ provider ,
18
18
private MessageSerializerInterface $ serializer ,
19
- private LoopInterface $ loop ,
20
- private int $ timeout = 3
21
- ) {
19
+ private LoopInterface $ loop ,
20
+ private int $ timeout = 3
21
+ )
22
+ {
22
23
}
23
24
24
25
public function runExisting (callable $ handlerCallback ): void
@@ -66,15 +67,18 @@ public function push(MessageInterface $message): MessageInterface
66
67
67
68
public function subscribe (callable $ handlerCallback ): void
68
69
{
69
- while ($ this ->loop ->canContinue ()) {
70
+ $ continue = true ;
71
+ while ($ continue ) {
70
72
$ message = $ this ->reserve ();
71
73
if (null === $ message ) {
74
+ $ continue = $ this ->loop ->canContinue ();
72
75
continue ;
73
76
}
74
77
75
78
$ result = $ handlerCallback ($ message );
76
- if ($ result ) {
77
- $ this ->provider ->delete ((string ) $ message ->getId ());
79
+ $ this ->provider ->delete ((string ) $ message ->getId ());
80
+ if (!$ result ) {
81
+ $ continue = false ;
78
82
}
79
83
}
80
84
}
@@ -99,4 +103,9 @@ private function reserve(): ?IdEnvelope
99
103
100
104
return $ envelope ;
101
105
}
106
+
107
+ public function getChannelName (): string
108
+ {
109
+ return $ this ->provider ->getChannelName ();
110
+ }
102
111
}
Original file line number Diff line number Diff line change @@ -17,7 +17,8 @@ class QueueProvider implements QueueProviderInterface
17
17
public function __construct (
18
18
private \Redis $ redis , //redis connection,
19
19
private string $ channelName = self ::DEFAULT_CHANNEL_NAME
20
- ) {
20
+ )
21
+ {
21
22
}
22
23
23
24
/**
@@ -145,4 +146,9 @@ private function checkConnection(): void
145
146
throw new NotConnectedRedisException ('Redis is not connected. ' );
146
147
}
147
148
}
149
+
150
+ public function getChannelName (): string
151
+ {
152
+ return $ this ->channelName ;
153
+ }
148
154
}
Original file line number Diff line number Diff line change @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool;
20
20
public function existInReserved (int $ id ): bool ;
21
21
22
22
public function withChannelName (string $ channelName ): self ;
23
+
24
+ public function getChannelName (): string ;
23
25
}
You can’t perform that action at this time.
0 commit comments