26
26
import org .springframework .amqp .core .MessageDeliveryMode ;
27
27
import org .springframework .amqp .core .MessageProperties ;
28
28
import org .springframework .amqp .support .AmqpHeaders ;
29
+ import org .springframework .amqp .utils .JavaUtils ;
29
30
import org .springframework .integration .IntegrationMessageHeaderAccessor ;
30
31
import org .springframework .integration .mapping .AbstractHeaderMapper ;
31
32
import org .springframework .integration .mapping .support .JsonHeaders ;
@@ -107,86 +108,55 @@ protected DefaultAmqpHeaderMapper(String[] requestHeaderNames, String[] replyHea
107
108
protected Map <String , Object > extractStandardHeaders (MessageProperties amqpMessageProperties ) {
108
109
Map <String , Object > headers = new HashMap <String , Object >();
109
110
try {
110
- String appId = amqpMessageProperties .getAppId ();
111
- if (StringUtils .hasText (appId )) {
112
- headers .put (AmqpHeaders .APP_ID , appId );
113
- }
114
- String clusterId = amqpMessageProperties .getClusterId ();
115
- if (StringUtils .hasText (clusterId )) {
116
- headers .put (AmqpHeaders .CLUSTER_ID , clusterId );
117
- }
118
- String contentEncoding = amqpMessageProperties .getContentEncoding ();
119
- if (StringUtils .hasText (contentEncoding )) {
120
- headers .put (AmqpHeaders .CONTENT_ENCODING , contentEncoding );
121
- }
111
+ JavaUtils .INSTANCE
112
+ .acceptIfNotNull (AmqpHeaders .APP_ID , amqpMessageProperties .getAppId (),
113
+ (key , value ) -> headers .put (key , value ))
114
+ .acceptIfNotNull (AmqpHeaders .CLUSTER_ID , amqpMessageProperties .getClusterId (),
115
+ (key , value ) -> headers .put (key , value ))
116
+ .acceptIfNotNull (AmqpHeaders .CONTENT_ENCODING , amqpMessageProperties .getContentEncoding (),
117
+ (key , value ) -> headers .put (key , value ));
122
118
long contentLength = amqpMessageProperties .getContentLength ();
123
- if (contentLength > 0 ) {
124
- headers .put (AmqpHeaders .CONTENT_LENGTH , contentLength );
125
- }
126
- String contentType = amqpMessageProperties .getContentType ();
127
- if (StringUtils .hasText (contentType )) {
128
- headers .put (AmqpHeaders .CONTENT_TYPE , contentType );
129
- }
130
- String correlationId = amqpMessageProperties .getCorrelationId ();
131
- if (StringUtils .hasText (correlationId )) {
132
- headers .put (AmqpHeaders .CORRELATION_ID , correlationId );
133
- }
134
- MessageDeliveryMode receivedDeliveryMode = amqpMessageProperties .getReceivedDeliveryMode ();
135
- if (receivedDeliveryMode != null ) {
136
- headers .put (AmqpHeaders .RECEIVED_DELIVERY_MODE , receivedDeliveryMode );
137
- }
119
+ JavaUtils .INSTANCE
120
+ .acceptIfCondition (contentLength > 0 , AmqpHeaders .CONTENT_LENGTH , contentLength ,
121
+ (key , value ) -> headers .put (key , value ))
122
+ .acceptIfHasText (AmqpHeaders .CONTENT_TYPE , amqpMessageProperties .getContentType (),
123
+ (key , value ) -> headers .put (key , value ))
124
+ .acceptIfHasText (AmqpHeaders .CORRELATION_ID , amqpMessageProperties .getCorrelationId (),
125
+ (key , value ) -> headers .put (key , value ))
126
+ .acceptIfNotNull (AmqpHeaders .RECEIVED_DELIVERY_MODE , amqpMessageProperties .getReceivedDeliveryMode (),
127
+ (key , value ) -> headers .put (key , value ));
138
128
long deliveryTag = amqpMessageProperties .getDeliveryTag ();
139
- if (deliveryTag > 0 ) {
140
- headers .put (AmqpHeaders .DELIVERY_TAG , deliveryTag );
141
- }
142
- String expiration = amqpMessageProperties .getExpiration ();
143
- if (StringUtils .hasText (expiration )) {
144
- headers .put (AmqpHeaders .EXPIRATION , expiration );
145
- }
129
+ JavaUtils .INSTANCE
130
+ .acceptIfCondition (deliveryTag > 0 , AmqpHeaders .DELIVERY_TAG , deliveryTag ,
131
+ (key , value ) -> headers .put (key , value ))
132
+ .acceptIfHasText (AmqpHeaders .EXPIRATION , amqpMessageProperties .getExpiration (),
133
+ (key , value ) -> headers .put (key , value ));
146
134
Integer messageCount = amqpMessageProperties .getMessageCount ();
147
- if (messageCount != null && messageCount > 0 ) {
148
- headers .put (AmqpHeaders .MESSAGE_COUNT , messageCount );
149
- }
150
- String messageId = amqpMessageProperties .getMessageId ();
151
- if (StringUtils .hasText (messageId )) {
152
- headers .put (AmqpHeaders .MESSAGE_ID , messageId );
153
- }
135
+ JavaUtils .INSTANCE
136
+ .acceptIfCondition (messageCount != null && messageCount > 0 , AmqpHeaders .MESSAGE_COUNT , messageCount ,
137
+ (key , value ) -> headers .put (key , value ))
138
+ .acceptIfHasText (AmqpHeaders .MESSAGE_ID , amqpMessageProperties .getMessageId (),
139
+ (key , value ) -> headers .put (key , value ));
154
140
Integer priority = amqpMessageProperties .getPriority ();
155
- if (priority != null && priority > 0 ) {
156
- headers .put (IntegrationMessageHeaderAccessor .PRIORITY , priority );
157
- }
158
- Integer receivedDelay = amqpMessageProperties .getReceivedDelay ();
159
- if (receivedDelay != null ) {
160
- headers .put (AmqpHeaders .RECEIVED_DELAY , receivedDelay );
161
- }
162
- String receivedExchange = amqpMessageProperties .getReceivedExchange ();
163
- if (StringUtils .hasText (receivedExchange )) {
164
- headers .put (AmqpHeaders .RECEIVED_EXCHANGE , receivedExchange );
165
- }
166
- String receivedRoutingKey = amqpMessageProperties .getReceivedRoutingKey ();
167
- if (StringUtils .hasText (receivedRoutingKey )) {
168
- headers .put (AmqpHeaders .RECEIVED_ROUTING_KEY , receivedRoutingKey );
169
- }
170
- Boolean redelivered = amqpMessageProperties .isRedelivered ();
171
- if (redelivered != null ) {
172
- headers .put (AmqpHeaders .REDELIVERED , redelivered );
173
- }
174
- String replyTo = amqpMessageProperties .getReplyTo ();
175
- if (replyTo != null ) {
176
- headers .put (AmqpHeaders .REPLY_TO , replyTo );
177
- }
178
- Date timestamp = amqpMessageProperties .getTimestamp ();
179
- if (timestamp != null ) {
180
- headers .put (AmqpHeaders .TIMESTAMP , timestamp );
181
- }
182
- String type = amqpMessageProperties .getType ();
183
- if (StringUtils .hasText (type )) {
184
- headers .put (AmqpHeaders .TYPE , type );
185
- }
186
- String userId = amqpMessageProperties .getReceivedUserId ();
187
- if (StringUtils .hasText (userId )) {
188
- headers .put (AmqpHeaders .RECEIVED_USER_ID , userId );
189
- }
141
+ JavaUtils .INSTANCE
142
+ .acceptIfCondition (priority != null && priority > 0 , IntegrationMessageHeaderAccessor .PRIORITY ,
143
+ priority , (key , value ) -> headers .put (key , value ))
144
+ .acceptIfNotNull (AmqpHeaders .RECEIVED_DELAY , amqpMessageProperties .getReceivedDelay (),
145
+ (key , value ) -> headers .put (key , value ))
146
+ .acceptIfNotNull (AmqpHeaders .RECEIVED_EXCHANGE , amqpMessageProperties .getReceivedExchange (),
147
+ (key , value ) -> headers .put (key , value ))
148
+ .acceptIfHasText (AmqpHeaders .RECEIVED_ROUTING_KEY , amqpMessageProperties .getReceivedRoutingKey (),
149
+ (key , value ) -> headers .put (key , value ))
150
+ .acceptIfNotNull (AmqpHeaders .REDELIVERED , amqpMessageProperties .isRedelivered (),
151
+ (key , value ) -> headers .put (key , value ))
152
+ .acceptIfNotNull (AmqpHeaders .REPLY_TO , amqpMessageProperties .getReplyTo (),
153
+ (key , value ) -> headers .put (key , value ))
154
+ .acceptIfNotNull (AmqpHeaders .TIMESTAMP , amqpMessageProperties .getTimestamp (),
155
+ (key , value ) -> headers .put (key , value ))
156
+ .acceptIfHasText (AmqpHeaders .TYPE , amqpMessageProperties .getType (),
157
+ (key , value ) -> headers .put (key , value ))
158
+ .acceptIfHasText (AmqpHeaders .RECEIVED_USER_ID , amqpMessageProperties .getReceivedUserId (),
159
+ (key , value ) -> headers .put (key , value ));
190
160
191
161
for (String jsonHeader : JsonHeaders .HEADERS ) {
192
162
Object value = amqpMessageProperties .getHeaders ().get (jsonHeader .replaceFirst (JsonHeaders .PREFIX , "" ));
@@ -228,54 +198,30 @@ protected void populateStandardHeaders(Map<String, Object> headers, MessagePrope
228
198
@ Override
229
199
protected void populateStandardHeaders (@ Nullable Map <String , Object > allHeaders , Map <String , Object > headers ,
230
200
MessageProperties amqpMessageProperties ) {
231
- String appId = getHeaderIfAvailable (headers , AmqpHeaders .APP_ID , String .class );
232
- if (StringUtils .hasText (appId )) {
233
- amqpMessageProperties .setAppId (appId );
234
- }
235
- String clusterId = getHeaderIfAvailable (headers , AmqpHeaders .CLUSTER_ID , String .class );
236
- if (StringUtils .hasText (clusterId )) {
237
- amqpMessageProperties .setClusterId (clusterId );
238
- }
239
- String contentEncoding = getHeaderIfAvailable (headers , AmqpHeaders .CONTENT_ENCODING , String .class );
240
- if (StringUtils .hasText (contentEncoding )) {
241
- amqpMessageProperties .setContentEncoding (contentEncoding );
242
- }
243
- Long contentLength = getHeaderIfAvailable (headers , AmqpHeaders .CONTENT_LENGTH , Long .class );
244
- if (contentLength != null ) {
245
- amqpMessageProperties .setContentLength (contentLength );
246
- }
247
- String contentType = this .extractContentTypeAsString (headers );
248
201
249
- if (StringUtils .hasText (contentType )) {
250
- amqpMessageProperties .setContentType (contentType );
251
- }
252
-
253
- String correlationId = getHeaderIfAvailable (headers , AmqpHeaders .CORRELATION_ID , String .class );
254
- if (StringUtils .hasText (correlationId )) {
255
- amqpMessageProperties .setCorrelationId (correlationId );
256
- }
257
-
258
- Integer delay = getHeaderIfAvailable (headers , AmqpHeaders .DELAY , Integer .class );
259
- if (delay != null ) {
260
- amqpMessageProperties .setDelay (delay );
261
- }
262
- MessageDeliveryMode deliveryMode = getHeaderIfAvailable (headers , AmqpHeaders .DELIVERY_MODE ,
263
- MessageDeliveryMode .class );
264
- if (deliveryMode != null ) {
265
- amqpMessageProperties .setDeliveryMode (deliveryMode );
266
- }
267
- Long deliveryTag = getHeaderIfAvailable (headers , AmqpHeaders .DELIVERY_TAG , Long .class );
268
- if (deliveryTag != null ) {
269
- amqpMessageProperties .setDeliveryTag (deliveryTag );
270
- }
271
- String expiration = getHeaderIfAvailable (headers , AmqpHeaders .EXPIRATION , String .class );
272
- if (StringUtils .hasText (expiration )) {
273
- amqpMessageProperties .setExpiration (expiration );
274
- }
275
- Integer messageCount = getHeaderIfAvailable (headers , AmqpHeaders .MESSAGE_COUNT , Integer .class );
276
- if (messageCount != null ) {
277
- amqpMessageProperties .setMessageCount (messageCount );
278
- }
202
+ JavaUtils .INSTANCE
203
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .APP_ID , String .class ),
204
+ appId -> amqpMessageProperties .setAppId (appId ))
205
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .CLUSTER_ID , String .class ),
206
+ clusterId -> amqpMessageProperties .setClusterId (clusterId ))
207
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .CONTENT_ENCODING , String .class ),
208
+ contentEncoding -> amqpMessageProperties .setContentEncoding (contentEncoding ))
209
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .CONTENT_LENGTH , Long .class ),
210
+ contentLength -> amqpMessageProperties .setContentLength (contentLength ))
211
+ .acceptIfHasText (this .extractContentTypeAsString (headers ),
212
+ contentType -> amqpMessageProperties .setContentType (contentType ))
213
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .CORRELATION_ID , String .class ),
214
+ correlationId -> amqpMessageProperties .setCorrelationId (correlationId ))
215
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .DELAY , Integer .class ),
216
+ delay -> amqpMessageProperties .setDelay (delay ))
217
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .DELIVERY_MODE , MessageDeliveryMode .class ),
218
+ deliveryMode -> amqpMessageProperties .setDeliveryMode (deliveryMode ))
219
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .DELIVERY_TAG , Long .class ),
220
+ deliveryTag -> amqpMessageProperties .setDeliveryTag (deliveryTag ))
221
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .EXPIRATION , String .class ),
222
+ expiration -> amqpMessageProperties .setExpiration (expiration ))
223
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .MESSAGE_COUNT , Integer .class ),
224
+ messageCount -> amqpMessageProperties .setMessageCount (messageCount ));
279
225
String messageId = getHeaderIfAvailable (headers , AmqpHeaders .MESSAGE_ID , String .class );
280
226
if (StringUtils .hasText (messageId )) {
281
227
amqpMessageProperties .setMessageId (messageId );
@@ -286,26 +232,17 @@ else if (allHeaders != null) {
286
232
amqpMessageProperties .setMessageId (id .toString ());
287
233
}
288
234
}
289
- Integer priority = getHeaderIfAvailable (headers , IntegrationMessageHeaderAccessor .PRIORITY , Integer .class );
290
- if (priority != null ) {
291
- amqpMessageProperties .setPriority (priority );
292
- }
293
- String receivedExchange = getHeaderIfAvailable (headers , AmqpHeaders .RECEIVED_EXCHANGE , String .class );
294
- if (StringUtils .hasText (receivedExchange )) {
295
- amqpMessageProperties .setReceivedExchange (receivedExchange );
296
- }
297
- String receivedRoutingKey = getHeaderIfAvailable (headers , AmqpHeaders .RECEIVED_ROUTING_KEY , String .class );
298
- if (StringUtils .hasText (receivedRoutingKey )) {
299
- amqpMessageProperties .setReceivedRoutingKey (receivedRoutingKey );
300
- }
301
- Boolean redelivered = getHeaderIfAvailable (headers , AmqpHeaders .REDELIVERED , Boolean .class );
302
- if (redelivered != null ) {
303
- amqpMessageProperties .setRedelivered (redelivered );
304
- }
305
- String replyTo = getHeaderIfAvailable (headers , AmqpHeaders .REPLY_TO , String .class );
306
- if (replyTo != null ) {
307
- amqpMessageProperties .setReplyTo (replyTo );
308
- }
235
+ JavaUtils .INSTANCE
236
+ .acceptIfNotNull (getHeaderIfAvailable (headers , IntegrationMessageHeaderAccessor .PRIORITY , Integer .class ),
237
+ priority -> amqpMessageProperties .setPriority (priority ))
238
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .RECEIVED_EXCHANGE , String .class ),
239
+ receivedExchange -> amqpMessageProperties .setReceivedExchange (receivedExchange ))
240
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .RECEIVED_ROUTING_KEY , String .class ),
241
+ receivedRoutingKey -> amqpMessageProperties .setReceivedRoutingKey (receivedRoutingKey ))
242
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .REDELIVERED , Boolean .class ),
243
+ redelivered -> amqpMessageProperties .setRedelivered (redelivered ))
244
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .REPLY_TO , String .class ),
245
+ replyTo -> amqpMessageProperties .setReplyTo (replyTo ));
309
246
Date timestamp = getHeaderIfAvailable (headers , AmqpHeaders .TIMESTAMP , Date .class );
310
247
if (timestamp != null ) {
311
248
amqpMessageProperties .setTimestamp (timestamp );
@@ -316,14 +253,11 @@ else if (allHeaders != null) {
316
253
amqpMessageProperties .setTimestamp (new Date (ts ));
317
254
}
318
255
}
319
- String type = getHeaderIfAvailable (headers , AmqpHeaders .TYPE , String .class );
320
- if (type != null ) {
321
- amqpMessageProperties .setType (type );
322
- }
323
- String userId = getHeaderIfAvailable (headers , AmqpHeaders .USER_ID , String .class );
324
- if (StringUtils .hasText (userId )) {
325
- amqpMessageProperties .setUserId (userId );
326
- }
256
+ JavaUtils .INSTANCE
257
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .TYPE , String .class ),
258
+ type -> amqpMessageProperties .setType (type ))
259
+ .acceptIfNotNull (getHeaderIfAvailable (headers , AmqpHeaders .USER_ID , String .class ),
260
+ userId -> amqpMessageProperties .setUserId (userId ));
327
261
328
262
Map <String , String > jsonHeaders = new HashMap <String , String >();
329
263
@@ -346,14 +280,11 @@ else if (allHeaders != null) {
346
280
amqpMessageProperties .getHeaders ().putAll (jsonHeaders );
347
281
}
348
282
349
- String replyCorrelation = getHeaderIfAvailable (headers , AmqpHeaders .SPRING_REPLY_CORRELATION , String .class );
350
- if (StringUtils .hasLength (replyCorrelation )) {
351
- amqpMessageProperties .setHeader ("spring_reply_correlation" , replyCorrelation );
352
- }
353
- String replyToStack = getHeaderIfAvailable (headers , AmqpHeaders .SPRING_REPLY_TO_STACK , String .class );
354
- if (StringUtils .hasLength (replyToStack )) {
355
- amqpMessageProperties .setHeader ("spring_reply_to" , replyToStack );
356
- }
283
+ JavaUtils .INSTANCE
284
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .SPRING_REPLY_CORRELATION , String .class ),
285
+ replyCorrelation -> amqpMessageProperties .setHeader ("spring_reply_correlation" , replyCorrelation ))
286
+ .acceptIfHasText (getHeaderIfAvailable (headers , AmqpHeaders .SPRING_REPLY_TO_STACK , String .class ),
287
+ replyToStack -> amqpMessageProperties .setHeader ("spring_reply_to" , replyToStack ));
357
288
}
358
289
359
290
@ Override
0 commit comments