70
70
public class AmqpChannelFactoryBean extends AbstractFactoryBean <AbstractAmqpChannel >
71
71
implements SmartLifecycle , BeanNameAware {
72
72
73
- private volatile AbstractAmqpChannel channel ;
74
-
75
- private volatile List <ChannelInterceptor > interceptors ;
73
+ private final AmqpTemplate amqpTemplate = new RabbitTemplate ();
76
74
77
75
private final boolean messageDriven ;
78
76
79
- private final AmqpTemplate amqpTemplate = new RabbitTemplate ();
77
+ private AbstractAmqpChannel channel ;
78
+
79
+ private List <ChannelInterceptor > interceptors ;
80
80
81
- private volatile AmqpAdmin amqpAdmin ;
81
+ private AmqpAdmin amqpAdmin ;
82
82
83
- private volatile FanoutExchange exchange ;
83
+ private FanoutExchange exchange ;
84
84
85
- private volatile String queueName ;
85
+ private String queueName ;
86
86
87
- private volatile boolean autoStartup = true ;
87
+ private boolean autoStartup = true ;
88
88
89
- private volatile Advice [] adviceChain ;
89
+ private Advice [] adviceChain ;
90
90
91
- private volatile Integer concurrentConsumers ;
91
+ private Integer concurrentConsumers ;
92
92
93
- private volatile Integer consumersPerQueue ;
93
+ private Integer consumersPerQueue ;
94
94
95
- private volatile ConnectionFactory connectionFactory ;
95
+ private ConnectionFactory connectionFactory ;
96
96
97
- private volatile MessagePropertiesConverter messagePropertiesConverter ;
97
+ private MessagePropertiesConverter messagePropertiesConverter ;
98
98
99
- private volatile ErrorHandler errorHandler ;
99
+ private ErrorHandler errorHandler ;
100
100
101
- private volatile Boolean exposeListenerChannel ;
101
+ private Boolean exposeListenerChannel ;
102
102
103
- private volatile Integer phase ;
103
+ private Integer phase ;
104
104
105
- private volatile Integer prefetchCount ;
105
+ private Integer prefetchCount ;
106
106
107
- private volatile boolean isPubSub ;
107
+ private boolean isPubSub ;
108
108
109
- private volatile Long receiveTimeout ;
109
+ private Long receiveTimeout ;
110
110
111
- private volatile Long recoveryInterval ;
111
+ private Long recoveryInterval ;
112
112
113
- private volatile Long shutdownTimeout ;
113
+ private Long shutdownTimeout ;
114
114
115
- private volatile String beanName ;
115
+ private String beanName ;
116
116
117
- private volatile AcknowledgeMode acknowledgeMode ;
117
+ private AcknowledgeMode acknowledgeMode ;
118
118
119
- private volatile boolean channelTransacted ;
119
+ private boolean channelTransacted ;
120
120
121
- private volatile Executor taskExecutor ;
121
+ private Executor taskExecutor ;
122
122
123
- private volatile PlatformTransactionManager transactionManager ;
123
+ private PlatformTransactionManager transactionManager ;
124
124
125
- private volatile TransactionAttribute transactionAttribute ;
125
+ private TransactionAttribute transactionAttribute ;
126
126
127
- private volatile Integer txSize ;
127
+ private Integer batchSize ;
128
128
129
- private volatile Integer maxSubscribers ;
129
+ private Integer maxSubscribers ;
130
130
131
- private volatile Boolean missingQueuesFatal ;
131
+ private Boolean missingQueuesFatal ;
132
132
133
- private volatile MessageDeliveryMode defaultDeliveryMode ;
133
+ private MessageDeliveryMode defaultDeliveryMode ;
134
134
135
- private volatile Boolean extractPayload ;
135
+ private Boolean extractPayload ;
136
136
137
- private volatile AmqpHeaderMapper outboundHeaderMapper = DefaultAmqpHeaderMapper .outboundMapper ();
137
+ private AmqpHeaderMapper outboundHeaderMapper = DefaultAmqpHeaderMapper .outboundMapper ();
138
138
139
- private volatile AmqpHeaderMapper inboundHeaderMapper = DefaultAmqpHeaderMapper .inboundMapper ();
139
+ private AmqpHeaderMapper inboundHeaderMapper = DefaultAmqpHeaderMapper .inboundMapper ();
140
140
141
141
private boolean headersLast ;
142
142
@@ -314,8 +314,18 @@ public void setTransactionManager(PlatformTransactionManager transactionManager)
314
314
this .transactionManager = transactionManager ;
315
315
}
316
316
317
+ /**
318
+ * Specify a batch size for consumer.
319
+ * @param txSize the batch size to use
320
+ * @deprecated since 5.2 in favor of {@link #setBatchSize(Integer)}
321
+ */
322
+ @ Deprecated
317
323
public void setTxSize (int txSize ) {
318
- this .txSize = txSize ;
324
+ setBatchSize (txSize );
325
+ }
326
+
327
+ public void setBatchSize (Integer batchSize ) {
328
+ this .batchSize = batchSize ;
319
329
}
320
330
321
331
public void setMaxSubscribers (int maxSubscribers ) {
@@ -360,18 +370,20 @@ protected AbstractAmqpChannel createInstance() {
360
370
}
361
371
if (this .isPubSub ) {
362
372
PublishSubscribeAmqpChannel pubsub = new PublishSubscribeAmqpChannel (
363
- this .beanName , container , this .amqpTemplate , this .outboundHeaderMapper , this .inboundHeaderMapper );
373
+ this .beanName , container , this .amqpTemplate , this .outboundHeaderMapper ,
374
+ this .inboundHeaderMapper );
364
375
JavaUtils .INSTANCE
365
- .acceptIfNotNull (this .exchange , pubsub ::setExchange )
366
- .acceptIfNotNull (this .maxSubscribers , pubsub ::setMaxSubscribers );
376
+ .acceptIfNotNull (this .exchange , pubsub ::setExchange )
377
+ .acceptIfNotNull (this .maxSubscribers , pubsub ::setMaxSubscribers );
367
378
this .channel = pubsub ;
368
379
}
369
380
else {
370
381
PointToPointSubscribableAmqpChannel p2p = new PointToPointSubscribableAmqpChannel (
371
- this .beanName , container , this .amqpTemplate , this .outboundHeaderMapper , this .inboundHeaderMapper );
382
+ this .beanName , container , this .amqpTemplate , this .outboundHeaderMapper ,
383
+ this .inboundHeaderMapper );
372
384
JavaUtils .INSTANCE
373
- .acceptIfHasText (this .queueName , p2p ::setQueueName )
374
- .acceptIfNotNull (this .maxSubscribers , p2p ::setMaxSubscribers );
385
+ .acceptIfHasText (this .queueName , p2p ::setQueueName )
386
+ .acceptIfNotNull (this .maxSubscribers , p2p ::setMaxSubscribers );
375
387
this .channel = p2p ;
376
388
}
377
389
}
@@ -380,17 +392,17 @@ protected AbstractAmqpChannel createInstance() {
380
392
PollableAmqpChannel pollable = new PollableAmqpChannel (this .beanName , this .amqpTemplate ,
381
393
this .outboundHeaderMapper , this .inboundHeaderMapper );
382
394
JavaUtils .INSTANCE
383
- .acceptIfNotNull (this .amqpAdmin , pollable ::setAmqpAdmin )
384
- .acceptIfHasText (this .queueName , pollable ::setQueueName );
395
+ .acceptIfNotNull (this .amqpAdmin , pollable ::setAmqpAdmin )
396
+ .acceptIfHasText (this .queueName , pollable ::setQueueName );
385
397
this .channel = pollable ;
386
398
}
387
399
JavaUtils .INSTANCE
388
- .acceptIfNotEmpty (this .interceptors , this .channel ::setInterceptors );
400
+ .acceptIfNotEmpty (this .interceptors , this .channel ::setInterceptors );
389
401
this .channel .setBeanName (this .beanName );
390
402
JavaUtils .INSTANCE
391
- .acceptIfNotNull (getBeanFactory (), this .channel ::setBeanFactory )
392
- .acceptIfNotNull (this .defaultDeliveryMode , this .channel ::setDefaultDeliveryMode )
393
- .acceptIfNotNull (this .extractPayload , this .channel ::setExtractPayload );
403
+ .acceptIfNotNull (getBeanFactory (), this .channel ::setBeanFactory )
404
+ .acceptIfNotNull (this .defaultDeliveryMode , this .channel ::setDefaultDeliveryMode )
405
+ .acceptIfNotNull (this .extractPayload , this .channel ::setExtractPayload );
394
406
this .channel .setHeadersMappedLast (this .headersLast );
395
407
this .channel .afterPropertiesSet ();
396
408
return this .channel ;
@@ -401,9 +413,9 @@ private AbstractMessageListenerContainer createContainer() {
401
413
if (this .consumersPerQueue == null ) {
402
414
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer ();
403
415
JavaUtils .INSTANCE
404
- .acceptIfNotNull (this .concurrentConsumers , smlc ::setConcurrentConsumers )
405
- .acceptIfNotNull (this .receiveTimeout , smlc ::setReceiveTimeout )
406
- .acceptIfNotNull (this .txSize , smlc ::setTxSize );
416
+ .acceptIfNotNull (this .concurrentConsumers , smlc ::setConcurrentConsumers )
417
+ .acceptIfNotNull (this .receiveTimeout , smlc ::setReceiveTimeout )
418
+ .acceptIfNotNull (this .batchSize , smlc ::setBatchSize );
407
419
container = smlc ;
408
420
}
409
421
else {
@@ -412,24 +424,24 @@ private AbstractMessageListenerContainer createContainer() {
412
424
container = dmlc ;
413
425
}
414
426
JavaUtils .INSTANCE
415
- .acceptIfNotNull (this .acknowledgeMode , container ::setAcknowledgeMode )
416
- .acceptIfNotEmpty (this .adviceChain , container ::setAdviceChain );
427
+ .acceptIfNotNull (this .acknowledgeMode , container ::setAcknowledgeMode )
428
+ .acceptIfNotEmpty (this .adviceChain , container ::setAdviceChain );
417
429
container .setAutoStartup (this .autoStartup );
418
430
container .setChannelTransacted (this .channelTransacted );
419
431
container .setConnectionFactory (this .connectionFactory );
420
432
421
433
JavaUtils .INSTANCE
422
- .acceptIfNotNull (this .errorHandler , container ::setErrorHandler )
423
- .acceptIfNotNull (this .exposeListenerChannel , container ::setExposeListenerChannel )
424
- .acceptIfNotNull (this .messagePropertiesConverter , container ::setMessagePropertiesConverter )
425
- .acceptIfNotNull (this .phase , container ::setPhase )
426
- .acceptIfNotNull (this .prefetchCount , container ::setPrefetchCount )
427
- .acceptIfNotNull (this .recoveryInterval , container ::setRecoveryInterval )
428
- .acceptIfNotNull (this .shutdownTimeout , container ::setShutdownTimeout )
429
- .acceptIfNotNull (this .taskExecutor , container ::setTaskExecutor )
430
- .acceptIfNotNull (this .transactionAttribute , container ::setTransactionAttribute )
431
- .acceptIfNotNull (this .transactionManager , container ::setTransactionManager )
432
- .acceptIfNotNull (this .missingQueuesFatal , container ::setMissingQueuesFatal );
434
+ .acceptIfNotNull (this .errorHandler , container ::setErrorHandler )
435
+ .acceptIfNotNull (this .exposeListenerChannel , container ::setExposeListenerChannel )
436
+ .acceptIfNotNull (this .messagePropertiesConverter , container ::setMessagePropertiesConverter )
437
+ .acceptIfNotNull (this .phase , container ::setPhase )
438
+ .acceptIfNotNull (this .prefetchCount , container ::setPrefetchCount )
439
+ .acceptIfNotNull (this .recoveryInterval , container ::setRecoveryInterval )
440
+ .acceptIfNotNull (this .shutdownTimeout , container ::setShutdownTimeout )
441
+ .acceptIfNotNull (this .taskExecutor , container ::setTaskExecutor )
442
+ .acceptIfNotNull (this .transactionAttribute , container ::setTransactionAttribute )
443
+ .acceptIfNotNull (this .transactionManager , container ::setTransactionManager )
444
+ .acceptIfNotNull (this .missingQueuesFatal , container ::setMissingQueuesFatal );
433
445
return container ;
434
446
}
435
447
0 commit comments