@@ -456,13 +456,10 @@ else if (returnType.isAssignableFrom(this.asyncSubmitListenableType)) {
456
456
else if (CompletableFuture .class .equals (returnType )) { // exact
457
457
return CompletableFuture .supplyAsync (invoker , this .asyncExecutor );
458
458
}
459
- else if (Future .class .isAssignableFrom (returnType )) {
460
- if (logger .isDebugEnabled ()) {
461
- logger .debug ("AsyncTaskExecutor submit*() return types are incompatible with the method return " +
462
- "type; "
463
- + "running on calling thread; the downstream flow must return the required Future: "
464
- + returnType .getSimpleName ());
465
- }
459
+ else if (Future .class .isAssignableFrom (returnType ) && logger .isDebugEnabled ()) {
460
+ logger .debug ("AsyncTaskExecutor submit*() return types are incompatible with the method return " +
461
+ "type; running on calling thread; the downstream flow must return the required Future: "
462
+ + returnType .getSimpleName ());
466
463
}
467
464
}
468
465
if (Mono .class .isAssignableFrom (returnType )) {
@@ -494,37 +491,18 @@ private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningO
494
491
Method method = invocation .getMethod ();
495
492
MethodInvocationGateway gateway = this .gatewayMap .get (method );
496
493
Class <?> returnType = method .getReturnType ();
497
- if (gateway .isReturnTypeMessage == null ) {
498
- gateway .isReturnTypeMessage =
499
- Message . class . isAssignableFrom ( returnType ) || hasReturnMessageTypeOnFunction (method );
494
+ if (gateway .getReturnTypeMessage () == null ) {
495
+ gateway .setReturnTypeMessage ( Message . class . isAssignableFrom ( returnType )
496
+ || hasReturnMessageTypeOnFunction (method ) );
500
497
}
501
498
boolean shouldReturnMessage =
502
499
gateway .isReturnTypeMessage || hasReturnParameterizedWithMessage (method , runningOnCallerThread );
503
500
boolean shouldReply = returnType != void .class ;
504
501
int paramCount = method .getParameterTypes ().length ;
505
- Object response = null ;
502
+ Object response ;
506
503
boolean hasPayloadExpression = findPayloadExpression (method );
507
504
if (paramCount == 0 && !hasPayloadExpression ) {
508
- Long receiveTimeout = null ;
509
- if (gateway .getReceiveTimeoutExpression () != null ) {
510
- receiveTimeout = gateway .getReceiveTimeoutExpression ().getValue (this .evaluationContext , Long .class );
511
- }
512
- if (shouldReply ) {
513
- if (shouldReturnMessage ) {
514
- if (receiveTimeout != null ) {
515
- return gateway .receiveMessage (receiveTimeout );
516
- }
517
- else {
518
- return gateway .receiveMessage ();
519
- }
520
- }
521
- if (receiveTimeout != null ) {
522
- response = gateway .receive (receiveTimeout );
523
- }
524
- else {
525
- response = gateway .receive ();
526
- }
527
- }
505
+ response = receive (gateway , method , shouldReply , shouldReturnMessage );
528
506
}
529
507
else {
530
508
response = sendOrSendAndReceive (invocation , gateway , shouldReturnMessage , shouldReply );
@@ -557,6 +535,35 @@ else if (this.globalMethodMetadata != null) {
557
535
return hasPayloadExpression ;
558
536
}
559
537
538
+ @ Nullable
539
+ private Object receive (MethodInvocationGateway gateway , Method method , boolean shouldReply ,
540
+ boolean shouldReturnMessage ) {
541
+
542
+ Long receiveTimeout = null ;
543
+ Expression receiveTimeoutExpression = gateway .getReceiveTimeoutExpression ();
544
+ if (receiveTimeoutExpression != null ) {
545
+ receiveTimeout = receiveTimeoutExpression .getValue (this .evaluationContext , Long .class );
546
+ }
547
+ if (shouldReply ) {
548
+ if (shouldReturnMessage ) {
549
+ if (receiveTimeout != null ) {
550
+ return gateway .receiveMessage (receiveTimeout );
551
+ }
552
+ else {
553
+ return gateway .receiveMessage ();
554
+ }
555
+ }
556
+ if (receiveTimeout != null ) {
557
+ return gateway .receive (receiveTimeout );
558
+ }
559
+ else {
560
+ return gateway .receive ();
561
+ }
562
+ }
563
+ throw new IllegalArgumentException ("The 'void' method without arguments '" + method + "' is not eligible for" +
564
+ " gateway invocation. Consider to use different signature or 'payloadExpression'." );
565
+ }
566
+
560
567
@ Nullable
561
568
private Object sendOrSendAndReceive (MethodInvocation invocation , MethodInvocationGateway gateway ,
562
569
boolean shouldReturnMessage , boolean shouldReply ) {
@@ -595,17 +602,18 @@ private void rethrowExceptionCauseIfPossible(Throwable originalException, Method
595
602
596
603
private MethodInvocationGateway createGatewayForMethod (Method method ) {
597
604
Gateway gatewayAnnotation = method .getAnnotation (Gateway .class );
598
- String requestChannelName = null ;
599
- String replyChannelName = null ;
605
+ GatewayMethodMetadata methodMetadata = null ;
606
+ if (!CollectionUtils .isEmpty (this .methodMetadataMap )) {
607
+ methodMetadata = this .methodMetadataMap .get (method .getName ());
608
+ }
609
+ Map <String , Expression > headerExpressions = new HashMap <>();
600
610
Expression requestTimeout = this .defaultRequestTimeout ;
601
611
Expression replyTimeout = this .defaultReplyTimeout ;
602
- Expression payloadExpression = this . globalMethodMetadata != null
603
- ? this . globalMethodMetadata . getPayloadExpression ()
604
- : null ;
605
- Map < String , Expression > headerExpressions = new HashMap <>( );
612
+ Expression payloadExpression =
613
+ extractPayloadExpressionFromAnnotationOrMetadata ( gatewayAnnotation , methodMetadata );
614
+ String requestChannelName = extractRequestChannelFromAnnotationOrMetadata ( gatewayAnnotation , methodMetadata ) ;
615
+ String replyChannelName = extractReplyChannelFromAnnotationOrMetadata ( gatewayAnnotation , methodMetadata );
606
616
if (gatewayAnnotation != null ) {
607
- requestChannelName = gatewayAnnotation .requestChannel ();
608
- replyChannelName = gatewayAnnotation .replyChannel ();
609
617
/*
610
618
* INT-2636 Unspecified annotation attributes should not
611
619
* override the default values supplied by explicit configuration.
@@ -625,62 +633,74 @@ private MethodInvocationGateway createGatewayForMethod(Method method) {
625
633
if (StringUtils .hasText (gatewayAnnotation .replyTimeoutExpression ())) {
626
634
replyTimeout = ExpressionUtils .longExpression (gatewayAnnotation .replyTimeoutExpression ());
627
635
}
636
+
637
+ annotationHeaders (gatewayAnnotation , headerExpressions );
638
+ }
639
+ else if (methodMetadata != null ) {
640
+ if (!CollectionUtils .isEmpty (methodMetadata .getHeaderExpressions ())) {
641
+ headerExpressions .putAll (methodMetadata .getHeaderExpressions ());
642
+ }
643
+ String reqTimeout = methodMetadata .getRequestTimeout ();
644
+ if (StringUtils .hasText (reqTimeout )) {
645
+ requestTimeout = ExpressionUtils .longExpression (reqTimeout );
646
+ }
647
+ String repTimeout = methodMetadata .getReplyTimeout ();
648
+ if (StringUtils .hasText (repTimeout )) {
649
+ replyTimeout = ExpressionUtils .longExpression (repTimeout );
650
+ }
651
+ }
652
+
653
+ return doCreateMethodInvocationGateway (method , payloadExpression , headerExpressions ,
654
+ requestChannelName , replyChannelName , requestTimeout , replyTimeout );
655
+ }
656
+
657
+ @ Nullable
658
+ private Expression extractPayloadExpressionFromAnnotationOrMetadata (@ Nullable Gateway gatewayAnnotation ,
659
+ @ Nullable GatewayMethodMetadata methodMetadata ) {
660
+
661
+ Expression payloadExpression =
662
+ this .globalMethodMetadata != null
663
+ ? this .globalMethodMetadata .getPayloadExpression ()
664
+ : null ;
665
+
666
+ if (gatewayAnnotation != null ) {
628
667
if (payloadExpression == null && StringUtils .hasText (gatewayAnnotation .payloadExpression ())) {
629
668
payloadExpression = PARSER .parseExpression (gatewayAnnotation .payloadExpression ());
630
669
}
631
-
632
- annotationHeaders (gatewayAnnotation , headerExpressions );
633
670
}
634
- else if (this .methodMetadataMap != null && this .methodMetadataMap .size () > 0 ) {
635
- GatewayMethodMetadata methodMetadata = this .methodMetadataMap .get (method .getName ());
636
- if (methodMetadata != null ) {
637
- if (methodMetadata .getPayloadExpression () != null ) {
638
- payloadExpression = methodMetadata .getPayloadExpression ();
639
- }
640
- if (!CollectionUtils .isEmpty (methodMetadata .getHeaderExpressions ())) {
641
- headerExpressions .putAll (methodMetadata .getHeaderExpressions ());
642
- }
643
- requestChannelName = methodMetadata .getRequestChannelName ();
644
- replyChannelName = methodMetadata .getReplyChannelName ();
645
- String reqTimeout = methodMetadata .getRequestTimeout ();
646
- if (StringUtils .hasText (reqTimeout )) {
647
- requestTimeout = ExpressionUtils .longExpression (reqTimeout );
648
- }
649
- String repTimeout = methodMetadata .getReplyTimeout ();
650
- if (StringUtils .hasText (repTimeout )) {
651
- replyTimeout = ExpressionUtils .longExpression (repTimeout );
652
- }
671
+ else if (methodMetadata != null ) {
672
+ if (methodMetadata .getPayloadExpression () != null ) {
673
+ payloadExpression = methodMetadata .getPayloadExpression ();
653
674
}
654
675
}
655
- Map <String , Object > headers = headers (method , headerExpressions );
656
676
657
- GatewayMethodInboundMessageMapper messageMapper = new GatewayMethodInboundMessageMapper (method ,
658
- headerExpressions ,
659
- this .globalMethodMetadata != null ? this .globalMethodMetadata .getHeaderExpressions () : null ,
660
- headers , this .argsMapper , getMessageBuilderFactory ());
677
+ return payloadExpression ;
678
+ }
661
679
662
- MethodInvocationGateway gateway = new MethodInvocationGateway (messageMapper );
680
+ @ Nullable
681
+ private String extractRequestChannelFromAnnotationOrMetadata (@ Nullable Gateway gatewayAnnotation ,
682
+ @ Nullable GatewayMethodMetadata methodMetadata ) {
663
683
664
- JavaUtils .INSTANCE
665
- .acceptIfNotNull (payloadExpression , messageMapper ::setPayloadExpression )
666
- .acceptIfNotNull (getTaskScheduler (), gateway ::setTaskScheduler );
667
- gateway .setBeanName (getComponentName ());
684
+ if (gatewayAnnotation != null ) {
685
+ return gatewayAnnotation .requestChannel ();
686
+ }
687
+ else if (methodMetadata != null ) {
688
+ return methodMetadata .getRequestChannelName ();
689
+ }
690
+ return null ;
691
+ }
668
692
669
- setChannel (this .errorChannel , gateway ::setErrorChannel , this .errorChannelName , gateway ::setErrorChannelName );
670
- setChannel (requestChannelName , this .defaultRequestChannelName , gateway ::setRequestChannelName ,
671
- this .defaultRequestChannel , gateway ::setRequestChannel );
672
- setChannel (replyChannelName , this .defaultReplyChannelName , gateway ::setReplyChannelName ,
673
- this .defaultReplyChannel , gateway ::setReplyChannel );
693
+ @ Nullable
694
+ private String extractReplyChannelFromAnnotationOrMetadata (@ Nullable Gateway gatewayAnnotation ,
695
+ @ Nullable GatewayMethodMetadata methodMetadata ) {
674
696
675
- timeouts (requestTimeout , replyTimeout , messageMapper , gateway );
676
- BeanFactory beanFactory = getBeanFactory ();
677
- if (beanFactory != null ) {
678
- gateway .setBeanFactory (beanFactory );
679
- messageMapper .setBeanFactory (beanFactory );
697
+ if (gatewayAnnotation != null ) {
698
+ return gatewayAnnotation .replyChannel ();
680
699
}
681
- gateway .setShouldTrack (this .shouldTrack );
682
- gateway .afterPropertiesSet ();
683
- return gateway ;
700
+ else if (methodMetadata != null ) {
701
+ return methodMetadata .getReplyChannelName ();
702
+ }
703
+ return null ;
684
704
}
685
705
686
706
private void annotationHeaders (Gateway gatewayAnnotation , Map <String , Expression > headerExpressions ) {
@@ -702,6 +722,41 @@ private void annotationHeaders(Gateway gatewayAnnotation, Map<String, Expression
702
722
}
703
723
}
704
724
725
+ private MethodInvocationGateway doCreateMethodInvocationGateway (Method method ,
726
+ @ Nullable Expression payloadExpression , Map <String , Expression > headerExpressions ,
727
+ @ Nullable String requestChannelName , @ Nullable String replyChannelName ,
728
+ Expression requestTimeout , Expression replyTimeout ) {
729
+
730
+ GatewayMethodInboundMessageMapper messageMapper = createGatewayMessageMapper (method , headerExpressions );
731
+ MethodInvocationGateway gateway = new MethodInvocationGateway (messageMapper );
732
+
733
+ JavaUtils .INSTANCE
734
+ .acceptIfNotNull (payloadExpression , messageMapper ::setPayloadExpression )
735
+ .acceptIfNotNull (getTaskScheduler (), gateway ::setTaskScheduler );
736
+
737
+ channels (requestChannelName , replyChannelName , gateway );
738
+
739
+ timeouts (requestTimeout , replyTimeout , messageMapper , gateway );
740
+
741
+ gateway .setBeanName (getComponentName ());
742
+ gateway .setBeanFactory (getBeanFactory ());
743
+ gateway .setShouldTrack (this .shouldTrack );
744
+ gateway .afterPropertiesSet ();
745
+
746
+ return gateway ;
747
+ }
748
+
749
+ private GatewayMethodInboundMessageMapper createGatewayMessageMapper (Method method , Map <String ,
750
+ Expression > headerExpressions ) {
751
+
752
+ Map <String , Object > headers = headers (method , headerExpressions );
753
+
754
+ return new GatewayMethodInboundMessageMapper (method ,
755
+ headerExpressions ,
756
+ this .globalMethodMetadata != null ? this .globalMethodMetadata .getHeaderExpressions () : null ,
757
+ headers , this .argsMapper , getMessageBuilderFactory ());
758
+ }
759
+
705
760
@ Nullable
706
761
private Map <String , Object > headers (Method method , Map <String , Expression > headerExpressions ) {
707
762
Map <String , Object > headers = null ;
@@ -744,7 +799,17 @@ private void validateHeaders(Set<String> headerNames) {
744
799
}
745
800
}
746
801
747
- private void timeouts (Expression requestTimeout , Expression replyTimeout ,
802
+ private void channels (@ Nullable String requestChannelName , @ Nullable String replyChannelName ,
803
+ MethodInvocationGateway gateway ) {
804
+
805
+ setChannel (this .errorChannel , gateway ::setErrorChannel , this .errorChannelName , gateway ::setErrorChannelName );
806
+ setChannel (requestChannelName , this .defaultRequestChannelName , gateway ::setRequestChannelName ,
807
+ this .defaultRequestChannel , gateway ::setRequestChannel );
808
+ setChannel (replyChannelName , this .defaultReplyChannelName , gateway ::setReplyChannelName ,
809
+ this .defaultReplyChannel , gateway ::setReplyChannel );
810
+ }
811
+
812
+ private void timeouts (@ Nullable Expression requestTimeout , @ Nullable Expression replyTimeout ,
748
813
GatewayMethodInboundMessageMapper messageMapper , MethodInvocationGateway gateway ) {
749
814
if (requestTimeout == null ) {
750
815
gateway .setRequestTimeout (-1 );
@@ -869,12 +934,13 @@ private static final class MethodInvocationGateway extends MessagingGatewaySuppo
869
934
870
935
private Expression receiveTimeoutExpression ;
871
936
872
- volatile Boolean isReturnTypeMessage ;
937
+ private volatile Boolean isReturnTypeMessage ;
873
938
874
939
MethodInvocationGateway (GatewayMethodInboundMessageMapper messageMapper ) {
875
940
setRequestMapper (messageMapper );
876
941
}
877
942
943
+ @ Nullable
878
944
Expression getReceiveTimeoutExpression () {
879
945
return this .receiveTimeoutExpression ;
880
946
}
@@ -883,6 +949,15 @@ void setReceiveTimeoutExpression(Expression receiveTimeoutExpression) {
883
949
this .receiveTimeoutExpression = receiveTimeoutExpression ;
884
950
}
885
951
952
+ @ Nullable
953
+ Boolean getReturnTypeMessage () {
954
+ return this .isReturnTypeMessage ;
955
+ }
956
+
957
+ void setReturnTypeMessage (Boolean returnTypeMessage ) {
958
+ this .isReturnTypeMessage = returnTypeMessage ;
959
+ }
960
+
886
961
}
887
962
888
963
private final class Invoker implements Supplier <Object > {
@@ -902,7 +977,7 @@ public Object get() {
902
977
throw e ;
903
978
}
904
979
catch (Throwable t ) { //NOSONAR
905
- if (t instanceof RuntimeException ) {
980
+ if (t instanceof RuntimeException ) { //NOSONAR
906
981
throw (RuntimeException ) t ;
907
982
}
908
983
throw new MessagingException ("Asynchronous gateway invocation failed" , t );
0 commit comments