18
18
19
19
import java .io .ByteArrayOutputStream ;
20
20
import java .io .IOException ;
21
+ import java .lang .reflect .Method ;
21
22
import java .net .URI ;
22
23
import java .nio .ByteBuffer ;
23
24
import java .util .List ;
30
31
import io .undertow .client .ClientRequest ;
31
32
import io .undertow .client .ClientResponse ;
32
33
import io .undertow .client .UndertowClient ;
34
+ import io .undertow .connector .ByteBufferPool ;
35
+ import io .undertow .connector .PooledByteBuffer ;
36
+ import io .undertow .server .DefaultByteBufferPool ;
33
37
import io .undertow .util .AttachmentKey ;
34
38
import io .undertow .util .HeaderMap ;
35
39
import io .undertow .util .HttpString ;
36
40
import io .undertow .util .Methods ;
37
41
import io .undertow .util .StringReadChannelListener ;
38
42
import org .xnio .ChannelListener ;
39
43
import org .xnio .ChannelListeners ;
44
+ import org .xnio .IoFuture ;
40
45
import org .xnio .IoUtils ;
41
46
import org .xnio .OptionMap ;
42
47
import org .xnio .Options ;
48
+ import org .xnio .Pool ;
43
49
import org .xnio .Xnio ;
44
50
import org .xnio .XnioWorker ;
45
51
import org .xnio .channels .StreamSinkChannel ;
49
55
import org .springframework .http .HttpStatus ;
50
56
import org .springframework .http .ResponseEntity ;
51
57
import org .springframework .util .Assert ;
58
+ import org .springframework .util .ClassUtils ;
59
+ import org .springframework .util .ReflectionUtils ;
52
60
import org .springframework .util .concurrent .SettableListenableFuture ;
53
61
import org .springframework .web .client .HttpServerErrorException ;
54
62
import org .springframework .web .socket .CloseStatus ;
61
69
62
70
/**
63
71
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
64
- * Compatible with Undertow 1.0, 1.1, 1.2 .
72
+ * Compatible with Undertow from version 1.0 to 1.3 .
65
73
*
66
74
* <p>When used for testing purposes (e.g. load testing) or for specific use cases
67
75
* (like HTTPS configuration), a custom OptionMap should be provided:
84
92
*/
85
93
public class UndertowXhrTransport extends AbstractXhrTransport {
86
94
95
+ private static final boolean undertow13Present = ClassUtils .isPresent ("io.undertow.connector.ByteBufferPool" ,
96
+ UndertowXhrTransport .class .getClassLoader ());
97
+
87
98
private static final AttachmentKey <String > RESPONSE_BODY = AttachmentKey .create (String .class );
88
99
89
100
@@ -93,21 +104,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport {
93
104
94
105
private final XnioWorker worker ;
95
106
96
- @ SuppressWarnings ("deprecation" )
97
- private final org .xnio .Pool <ByteBuffer > bufferPool ;
107
+ private final UndertowBufferSupport undertowBufferSupport ;
98
108
99
109
100
110
public UndertowXhrTransport () throws IOException {
101
111
this (OptionMap .builder ().parse (Options .WORKER_NAME , "SockJSClient" ).getMap ());
102
112
}
103
113
104
- @ SuppressWarnings ("deprecation" )
105
114
public UndertowXhrTransport (OptionMap optionMap ) throws IOException {
106
115
Assert .notNull (optionMap , "OptionMap is required" );
107
116
this .optionMap = optionMap ;
108
117
this .httpClient = UndertowClient .getInstance ();
109
118
this .worker = Xnio .getInstance ().createWorker (optionMap );
110
- this .bufferPool = new org .xnio .ByteBufferSlicePool (1048 , 1048 );
119
+ if (undertow13Present ) {
120
+ this .undertowBufferSupport = new Undertow13BufferSupport ();
121
+ }
122
+ else {
123
+ this .undertowBufferSupport = new Undertow10BufferSupport ();
124
+ }
111
125
}
112
126
113
127
@@ -144,24 +158,25 @@ private void executeReceiveRequest(final TransportRequest transportRequest,
144
158
logger .trace ("Starting XHR receive request for " + url );
145
159
}
146
160
147
- this .httpClient .connect (
148
- new ClientCallback <ClientConnection >() {
149
- @ Override
150
- public void completed (ClientConnection connection ) {
151
- ClientRequest request = new ClientRequest ().setMethod (Methods .POST ).setPath (url .getPath ());
152
- HttpString headerName = HttpString .tryFromString (HttpHeaders .HOST );
153
- request .getRequestHeaders ().add (headerName , url .getHost ());
154
- addHttpHeaders (request , headers );
155
- HttpHeaders httpHeaders = transportRequest .getHttpRequestHeaders ();
156
- connection .sendRequest (request , createReceiveCallback (transportRequest ,
157
- url , httpHeaders , session , connectFuture ));
158
- }
159
- @ Override
160
- public void failed (IOException ex ) {
161
- throw new SockJsTransportFailureException ("Failed to execute request to " + url , ex );
162
- }
163
- },
164
- url , this .worker , this .bufferPool , this .optionMap );
161
+ ClientCallback <ClientConnection > clientCallback = new ClientCallback <ClientConnection >() {
162
+ @ Override
163
+ public void completed (ClientConnection connection ) {
164
+ ClientRequest request = new ClientRequest ().setMethod (Methods .POST ).setPath (url .getPath ());
165
+ HttpString headerName = HttpString .tryFromString (HttpHeaders .HOST );
166
+ request .getRequestHeaders ().add (headerName , url .getHost ());
167
+ addHttpHeaders (request , headers );
168
+ HttpHeaders httpHeaders = transportRequest .getHttpRequestHeaders ();
169
+ connection .sendRequest (request , createReceiveCallback (transportRequest ,
170
+ url , httpHeaders , session , connectFuture ));
171
+ }
172
+
173
+ @ Override
174
+ public void failed (IOException ex ) {
175
+ throw new SockJsTransportFailureException ("Failed to execute request to " + url , ex );
176
+ }
177
+ };
178
+
179
+ this .undertowBufferSupport .httpClientConnect (this .httpClient , clientCallback , url , worker , this .optionMap );
165
180
}
166
181
167
182
private static void addHttpHeaders (ClientRequest request , HttpHeaders headers ) {
@@ -211,6 +226,7 @@ public void completed(ClientExchange result) {
211
226
onFailure (exc );
212
227
}
213
228
}
229
+
214
230
@ Override
215
231
public void failed (IOException exc ) {
216
232
IoUtils .safeClose (exchange .getConnection ());
@@ -264,8 +280,8 @@ protected ResponseEntity<String> executeRequest(URI url, HttpString method, Http
264
280
List <ClientResponse > responses = new CopyOnWriteArrayList <ClientResponse >();
265
281
266
282
try {
267
- ClientConnection connection = this .httpClient . connect ( url , this . worker ,
268
- this .bufferPool , this .optionMap ).get ();
283
+ ClientConnection connection = this .undertowBufferSupport
284
+ . httpClientConnect ( this .httpClient , url , this . worker , this .optionMap ).get ();
269
285
try {
270
286
ClientRequest request = new ClientRequest ().setMethod (method ).setPath (url .getPath ());
271
287
request .getRequestHeaders ().add (HttpString .tryFromString (HttpHeaders .HOST ), url .getHost ());
@@ -314,12 +330,14 @@ protected void stringDone(String string) {
314
330
result .getResponse ().putAttachment (RESPONSE_BODY , string );
315
331
latch .countDown ();
316
332
}
333
+
317
334
@ Override
318
335
protected void error (IOException ex ) {
319
336
onFailure (latch , ex );
320
337
}
321
338
}.setup (result .getResponseChannel ());
322
339
}
340
+
323
341
@ Override
324
342
public void failed (IOException ex ) {
325
343
onFailure (latch , ex );
@@ -389,7 +407,6 @@ public void setup(StreamSourceChannel channel) {
389
407
}
390
408
391
409
@ Override
392
- @ SuppressWarnings ("deprecation" )
393
410
public void handleEvent (StreamSourceChannel channel ) {
394
411
if (this .session .isDisconnected ()) {
395
412
if (logger .isDebugEnabled ()) {
@@ -399,11 +416,11 @@ public void handleEvent(StreamSourceChannel channel) {
399
416
throw new SockJsException ("Session closed." , this .session .getId (), null );
400
417
}
401
418
402
- org . xnio . Pooled < ByteBuffer > pooled = this . connection . getBufferPool (). allocate ();
419
+ Object pooled = undertowBufferSupport . allocatePooledResource ();
403
420
try {
404
421
int r ;
405
422
do {
406
- ByteBuffer buffer = pooled . getResource ( );
423
+ ByteBuffer buffer = undertowBufferSupport . getByteBuffer ( pooled );
407
424
buffer .clear ();
408
425
r = channel .read (buffer );
409
426
buffer .flip ();
@@ -431,7 +448,7 @@ else if (r == -1) {
431
448
onFailure (exc );
432
449
}
433
450
finally {
434
- pooled . free ( );
451
+ undertowBufferSupport . closePooledResource ( pooled );
435
452
}
436
453
}
437
454
@@ -473,4 +490,116 @@ public void onFailure(Throwable failure) {
473
490
}
474
491
}
475
492
493
+ private interface UndertowBufferSupport {
494
+
495
+ Object allocatePooledResource ();
496
+
497
+ ByteBuffer getByteBuffer (Object pooled );
498
+
499
+ void closePooledResource (Object pooled );
500
+
501
+ void httpClientConnect (UndertowClient httpClient , final ClientCallback <ClientConnection > listener ,
502
+ final URI uri , final XnioWorker worker , OptionMap options );
503
+
504
+ IoFuture <ClientConnection > httpClientConnect (UndertowClient httpClient , final URI uri ,
505
+ final XnioWorker worker , OptionMap options );
506
+ }
507
+
508
+ private class Undertow10BufferSupport implements UndertowBufferSupport {
509
+
510
+ private final org .xnio .Pool <ByteBuffer > xnioBufferPool ;
511
+
512
+ private final Method httpClientConnectCallbackMethod ;
513
+
514
+ private final Method httpClientConnectMethod ;
515
+
516
+ public Undertow10BufferSupport () {
517
+ this .xnioBufferPool = new org .xnio .ByteBufferSlicePool (1048 , 1048 );
518
+ this .httpClientConnectCallbackMethod = ReflectionUtils .findMethod (UndertowClient .class , "connect" ,
519
+ ClientCallback .class , URI .class , XnioWorker .class , Pool .class , OptionMap .class );
520
+ this .httpClientConnectMethod = ReflectionUtils .findMethod (UndertowClient .class , "connect" ,
521
+ URI .class , XnioWorker .class , Pool .class , OptionMap .class );
522
+ }
523
+
524
+ @ Override
525
+ public Object allocatePooledResource () {
526
+ return this .xnioBufferPool .allocate ();
527
+ }
528
+
529
+ @ Override
530
+ @ SuppressWarnings ("unchecked" )
531
+ public ByteBuffer getByteBuffer (Object pooled ) {
532
+ return ((org .xnio .Pooled <ByteBuffer >) pooled ).getResource ();
533
+ }
534
+
535
+ @ Override
536
+ @ SuppressWarnings ("unchecked" )
537
+ public void closePooledResource (Object pooled ) {
538
+ ((org .xnio .Pooled <ByteBuffer >) pooled ).close ();
539
+ }
540
+
541
+ @ Override
542
+ public void httpClientConnect (UndertowClient httpClient , ClientCallback <ClientConnection > listener , URI uri ,
543
+ XnioWorker worker , OptionMap options ) {
544
+ ReflectionUtils .invokeMethod (httpClientConnectCallbackMethod , httpClient , listener , uri , worker ,
545
+ this .xnioBufferPool , options );
546
+
547
+ }
548
+
549
+ @ Override
550
+ @ SuppressWarnings ("unchecked" )
551
+ public IoFuture <ClientConnection > httpClientConnect (UndertowClient httpClient , URI uri ,
552
+ XnioWorker worker , OptionMap options ) {
553
+ return (IoFuture <ClientConnection >) ReflectionUtils .invokeMethod (httpClientConnectMethod , httpClient , uri ,
554
+ worker , this .xnioBufferPool , options );
555
+ }
556
+ }
557
+
558
+ private class Undertow13BufferSupport implements UndertowBufferSupport {
559
+
560
+ private final ByteBufferPool undertowBufferPool ;
561
+
562
+ private final Method httpClientConnectCallbackMethod ;
563
+
564
+ private final Method httpClientConnectMethod ;
565
+
566
+ public Undertow13BufferSupport () {
567
+ this .undertowBufferPool = new DefaultByteBufferPool (false , 1024 , -1 , 2 );
568
+ this .httpClientConnectCallbackMethod = ReflectionUtils .findMethod (UndertowClient .class , "connect" ,
569
+ ClientCallback .class , URI .class , XnioWorker .class , ByteBufferPool .class , OptionMap .class );
570
+ this .httpClientConnectMethod = ReflectionUtils .findMethod (UndertowClient .class , "connect" ,
571
+ URI .class , XnioWorker .class , ByteBufferPool .class , OptionMap .class );
572
+ }
573
+
574
+ @ Override
575
+ public Object allocatePooledResource () {
576
+ return this .undertowBufferPool .allocate ();
577
+ }
578
+
579
+ @ Override
580
+ public ByteBuffer getByteBuffer (Object pooled ) {
581
+ return ((PooledByteBuffer ) pooled ).getBuffer ();
582
+ }
583
+
584
+ @ Override
585
+ public void closePooledResource (Object pooled ) {
586
+ ((PooledByteBuffer ) pooled ).close ();
587
+ }
588
+
589
+ @ Override
590
+ public void httpClientConnect (UndertowClient httpClient , ClientCallback <ClientConnection > listener , URI uri ,
591
+ XnioWorker worker , OptionMap options ) {
592
+ ReflectionUtils .invokeMethod (httpClientConnectCallbackMethod , httpClient , listener , uri ,
593
+ this .undertowBufferPool , worker , options );
594
+ }
595
+
596
+ @ Override
597
+ @ SuppressWarnings ("unchecked" )
598
+ public IoFuture <ClientConnection > httpClientConnect (UndertowClient httpClient , URI uri ,
599
+ XnioWorker worker , OptionMap options ) {
600
+ return (IoFuture <ClientConnection >) ReflectionUtils .invokeMethod (httpClientConnectMethod , httpClient , uri ,
601
+ worker , this .undertowBufferPool , options );
602
+ }
603
+ }
604
+
476
605
}
0 commit comments