16
16
17
17
package org .springframework .integration .endpoint ;
18
18
19
- import java .util .Collections ;
19
+ import java .util .HashMap ;
20
20
import java .util .Map ;
21
21
import java .util .concurrent .atomic .AtomicLong ;
22
22
33
33
import org .springframework .integration .util .AbstractExpressionEvaluator ;
34
34
import org .springframework .lang .Nullable ;
35
35
import org .springframework .messaging .Message ;
36
- import org .springframework .messaging .MessagingException ;
37
36
import org .springframework .util .CollectionUtils ;
38
37
39
38
/**
@@ -52,7 +51,7 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat
52
51
53
52
private final ManagementOverrides managementOverrides = new ManagementOverrides ();
54
53
55
- private volatile Map <String , Expression > headerExpressions = Collections . emptyMap () ;
54
+ private Map <String , Expression > headerExpressions ;
56
55
57
56
private String beanName ;
58
57
@@ -68,9 +67,10 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat
68
67
69
68
private CounterFacade receiveCounter ;
70
69
71
- public void setHeaderExpressions (Map <String , Expression > headerExpressions ) {
72
- this .headerExpressions = (headerExpressions != null )
73
- ? headerExpressions : Collections .emptyMap ();
70
+ public void setHeaderExpressions (@ Nullable Map <String , Expression > headerExpressions ) {
71
+ if (!CollectionUtils .isEmpty (headerExpressions )) {
72
+ this .headerExpressions = new HashMap <>(headerExpressions );
73
+ }
74
74
}
75
75
76
76
@ Override
@@ -162,68 +162,61 @@ public final Message<T> receive() {
162
162
163
163
@ SuppressWarnings ("unchecked" )
164
164
protected Message <T > buildMessage (Object result ) {
165
- Message <T > message = null ;
165
+ Message <? > message = null ;
166
166
Map <String , Object > headers = evaluateHeaders ();
167
- if (result instanceof AbstractIntegrationMessageBuilder ) {
167
+ if (result instanceof AbstractIntegrationMessageBuilder <?> ) {
168
168
if (!CollectionUtils .isEmpty (headers )) {
169
- ((AbstractIntegrationMessageBuilder <T >) result ).copyHeaders (headers );
169
+ ((AbstractIntegrationMessageBuilder <? >) result ).copyHeaders (headers );
170
170
}
171
- message = ((AbstractIntegrationMessageBuilder <T >) result ).build ();
171
+ message = ((AbstractIntegrationMessageBuilder <? >) result ).build ();
172
172
}
173
173
else if (result instanceof Message <?>) {
174
- try {
175
- message = (Message <T >) result ;
176
- }
177
- catch (Exception e ) {
178
- throw new MessagingException ("MessageSource returned unexpected type." , e );
179
- }
174
+ message = (Message <?>) result ;
180
175
if (!CollectionUtils .isEmpty (headers )) {
181
176
// create a new Message from this one in order to apply headers
182
- message = getMessageBuilderFactory ()
183
- .fromMessage (message )
184
- .copyHeaders (headers )
185
- .build ();
177
+ message =
178
+ getMessageBuilderFactory ()
179
+ .fromMessage (message )
180
+ .copyHeaders (headers )
181
+ .build ();
186
182
}
187
183
}
188
184
else if (result != null ) {
189
- T payload ;
190
- try {
191
- payload = (T ) result ;
192
- }
193
- catch (Exception e ) {
194
- throw new MessagingException ("MessageSource returned unexpected type." , e );
195
- }
196
- message = getMessageBuilderFactory ()
197
- .withPayload (payload )
198
- .copyHeaders (headers )
199
- .build ();
185
+ message =
186
+ getMessageBuilderFactory ()
187
+ .withPayload (result )
188
+ .copyHeaders (headers )
189
+ .build ();
200
190
}
201
191
if (this .countsEnabled && message != null ) {
202
192
if (this .metricsCaptor != null ) {
203
193
incrementReceiveCounter ();
204
194
}
205
195
this .messageCount .incrementAndGet ();
206
196
}
207
- return message ;
197
+ return ( Message < T >) message ;
208
198
}
209
199
210
200
private void incrementReceiveCounter () {
211
201
if (this .receiveCounter == null ) {
212
202
this .receiveCounter = this .metricsCaptor .counterBuilder (RECEIVE_COUNTER_NAME )
213
- .tag ("name" , getComponentName () == null ? "unknown" : getComponentName ())
214
- .tag ("type" , "source" )
215
- .tag ("result" , "success" )
216
- .tag ("exception" , "none" )
217
- .description ("Messages received" )
218
- .build ();
203
+ .tag ("name" , getComponentName () == null ? "unknown" : getComponentName ())
204
+ .tag ("type" , "source" )
205
+ .tag ("result" , "success" )
206
+ .tag ("exception" , "none" )
207
+ .description ("Messages received" )
208
+ .build ();
219
209
}
220
210
this .receiveCounter .increment ();
221
211
}
222
212
213
+ @ Nullable
223
214
private Map <String , Object > evaluateHeaders () {
224
- return ExpressionEvalMap .from (this .headerExpressions )
225
- .usingEvaluationContext (getEvaluationContext ())
226
- .build ();
215
+ return CollectionUtils .isEmpty (this .headerExpressions )
216
+ ? null
217
+ : ExpressionEvalMap .from (this .headerExpressions )
218
+ .usingEvaluationContext (getEvaluationContext ())
219
+ .build ();
227
220
}
228
221
229
222
/**
0 commit comments