|
19 | 19 | import net.lecousin.framework.exception.NoException;
|
20 | 20 | import net.lecousin.framework.network.SocketOptionValue;
|
21 | 21 | import net.lecousin.framework.network.ssl.SSLLayer;
|
| 22 | +import net.lecousin.framework.util.Triple; |
22 | 23 |
|
23 | 24 | /**
|
24 | 25 | * SSL client adding SSL layer to a TCPClient.
|
@@ -105,16 +106,26 @@ public void dataReceived(LinkedList<ByteBuffer> data) {
|
105 | 106 | if (data.isEmpty()) return;
|
106 | 107 | AsyncWork<ByteBuffer, IOException> waiting;
|
107 | 108 | ByteBuffer buffer;
|
| 109 | + Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer> waitAgain = null; |
108 | 110 | synchronized (this) {
|
109 |
| - waiting = (AsyncWork<ByteBuffer, IOException>)removeAttribute(WAITING_DATA_ATTRIBUTE); |
110 |
| - if (waiting == null) { |
| 111 | + LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>> list = |
| 112 | + (LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>>) |
| 113 | + getAttribute(WAITING_DATA_ATTRIBUTE); |
| 114 | + if (list == null) { |
111 | 115 | receivedData.addAll(data);
|
112 | 116 | return;
|
113 | 117 | }
|
| 118 | + waiting = list.removeFirst().getValue1(); |
| 119 | + if (list.isEmpty()) |
| 120 | + removeAttribute(WAITING_DATA_ATTRIBUTE); |
| 121 | + else |
| 122 | + waitAgain = list.getFirst(); |
114 | 123 | buffer = data.removeFirst();
|
115 | 124 | if (!data.isEmpty())
|
116 | 125 | receivedData.addAll(data);
|
117 | 126 | }
|
| 127 | + if (waitAgain != null) |
| 128 | + waitForSSLData(waitAgain.getValue2().intValue(), waitAgain.getValue3().intValue()); |
118 | 129 | waiting.unblockSuccess(buffer);
|
119 | 130 | }
|
120 | 131 |
|
@@ -197,40 +208,60 @@ public Void run() {
|
197 | 208 | @Override
|
198 | 209 | public AsyncWork<ByteBuffer, IOException> receiveData(int expectedBytes, int timeout) {
|
199 | 210 | AsyncWork<ByteBuffer, IOException> result = new AsyncWork<>();
|
| 211 | + boolean firstWait = false; |
200 | 212 | synchronized (sslClient) {
|
201 | 213 | if (!receivedData.isEmpty()) {
|
202 | 214 | result.unblockSuccess(receivedData.removeFirst());
|
203 |
| - } else |
204 |
| - setAttribute(WAITING_DATA_ATTRIBUTE, result); |
| 215 | + return result; |
| 216 | + } |
| 217 | + @SuppressWarnings("unchecked") |
| 218 | + LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>> list = |
| 219 | + (LinkedList<Triple<AsyncWork<ByteBuffer, IOException>,Integer,Integer>>)getAttribute(WAITING_DATA_ATTRIBUTE); |
| 220 | + if (list == null) { |
| 221 | + list = new LinkedList<>(); |
| 222 | + setAttribute(WAITING_DATA_ATTRIBUTE, list); |
| 223 | + firstWait = true; |
| 224 | + } |
| 225 | + list.add(new Triple<>(result, Integer.valueOf(expectedBytes), Integer.valueOf(timeout))); |
205 | 226 | }
|
206 | 227 | if (result.isUnblocked())
|
207 | 228 | return result;
|
208 |
| - waitForSSLData(expectedBytes, timeout); |
| 229 | + if (firstWait) |
| 230 | + waitForSSLData(expectedBytes, timeout); |
209 | 231 | return result;
|
210 | 232 | }
|
211 | 233 |
|
| 234 | + private AsyncWork<ByteBuffer, IOException> lastReceive = null; |
| 235 | + |
212 | 236 | private void waitForSSLData(int expectedBytes, int timeout) {
|
| 237 | + if (lastReceive != null && !lastReceive.isUnblocked()) |
| 238 | + return; |
213 | 239 | AsyncWork<ByteBuffer, IOException> receive = super.receiveData(expectedBytes, timeout);
|
| 240 | + lastReceive = receive; |
214 | 241 | receive.listenAsync(new Task.Cpu<Void, NoException>("Receive SSL data from server", Task.PRIORITY_NORMAL) {
|
215 | 242 | @SuppressWarnings("unchecked")
|
216 | 243 | @Override
|
217 | 244 | public Void run() {
|
218 | 245 | if (receive.hasError()) {
|
219 |
| - AsyncWork<ByteBuffer, IOException> waiting; |
| 246 | + LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>> list; |
220 | 247 | synchronized (sslClient) {
|
221 |
| - waiting = (AsyncWork<ByteBuffer, IOException>)removeAttribute(WAITING_DATA_ATTRIBUTE); |
| 248 | + list = (LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>>) |
| 249 | + removeAttribute(WAITING_DATA_ATTRIBUTE); |
222 | 250 | }
|
223 |
| - if (waiting != null) |
224 |
| - waiting.error(receive.getError()); |
| 251 | + if (list != null) |
| 252 | + for (Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer> t : list) |
| 253 | + t.getValue1().error(receive.getError()); |
225 | 254 | return null;
|
226 | 255 | }
|
227 | 256 | if (receive.isCancelled()) {
|
228 |
| - AsyncWork<ByteBuffer, IOException> waiting; |
| 257 | + LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>> list; |
229 | 258 | synchronized (sslClient) {
|
230 |
| - waiting = (AsyncWork<ByteBuffer, IOException>)removeAttribute(WAITING_DATA_ATTRIBUTE); |
| 259 | + list = (LinkedList<Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer>>) |
| 260 | + removeAttribute(WAITING_DATA_ATTRIBUTE); |
231 | 261 | }
|
232 |
| - if (waiting != null) |
233 |
| - waiting.cancel(receive.getCancelEvent()); |
| 262 | + if (list != null) |
| 263 | + for (Triple<AsyncWork<ByteBuffer, IOException>, Integer, Integer> t : list) |
| 264 | + t.getValue1().cancel(receive.getCancelEvent()); |
234 | 265 | return null;
|
235 | 266 | }
|
236 | 267 | ByteBuffer b = receive.getResult();
|
|
0 commit comments