Skip to content

Commit f62f47b

Browse files
tabish121davsclaus
authored andcommitted
https://issues.apache.org/jira/browse/AMQ-5564
Fixed session in the pool losing their reference to the anonymous producer created when useAnonymousProducers is true. The anonymous producer stays live for the life of the pooled session. Also added some synchronization safety to some methods that could get into NPE trouble. Conflicts: activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
1 parent ede604e commit f62f47b

5 files changed

Lines changed: 187 additions & 69 deletions

File tree

activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,37 +52,37 @@ public class ConnectionPool {
5252
private boolean useAnonymousProducers = true;
5353

5454
private final AtomicBoolean started = new AtomicBoolean(false);
55-
private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
55+
private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
5656
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
5757

5858
public ConnectionPool(Connection connection) {
5959

6060
this.connection = wrap(connection);
6161

6262
// Create our internal Pool of session instances.
63-
this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
64-
new KeyedPoolableObjectFactory<SessionKey, Session>() {
63+
this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
64+
new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
6565

6666
@Override
67-
public void activateObject(SessionKey key, Session session) throws Exception {
67+
public void activateObject(SessionKey key, SessionHolder session) throws Exception {
6868
}
6969

7070
@Override
71-
public void destroyObject(SessionKey key, Session session) throws Exception {
71+
public void destroyObject(SessionKey key, SessionHolder session) throws Exception {
7272
session.close();
7373
}
7474

7575
@Override
76-
public Session makeObject(SessionKey key) throws Exception {
77-
return makeSession(key);
76+
public SessionHolder makeObject(SessionKey key) throws Exception {
77+
return new SessionHolder(makeSession(key));
7878
}
7979

8080
@Override
81-
public void passivateObject(SessionKey key, Session session) throws Exception {
81+
public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
8282
}
8383

8484
@Override
85-
public boolean validateObject(SessionKey key, Session session) {
85+
public boolean validateObject(SessionKey key, SessionHolder session) {
8686
return true;
8787
}
8888
}

activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import javax.jms.ConnectionMetaData;
2525
import javax.jms.Destination;
2626
import javax.jms.ExceptionListener;
27+
import javax.jms.IllegalStateException;
2728
import javax.jms.JMSException;
2829
import javax.jms.Queue;
2930
import javax.jms.QueueConnection;
@@ -35,7 +36,7 @@
3536
import javax.jms.Topic;
3637
import javax.jms.TopicConnection;
3738
import javax.jms.TopicSession;
38-
import javax.jms.IllegalStateException;
39+
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142

@@ -163,8 +164,7 @@ public TopicSession createTopicSession(boolean transacted, int ackMode) throws J
163164

164165
@Override
165166
public Session createSession(boolean transacted, int ackMode) throws JMSException {
166-
PooledSession result;
167-
result = (PooledSession) pool.createSession(transacted, ackMode);
167+
PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
168168

169169
// Store the session so we can close the sessions that this PooledConnection
170170
// created in order to ensure that consumers etc are closed per the JMS contract.

activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java

Lines changed: 30 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
5555
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
5656

5757
private final SessionKey key;
58-
private final KeyedObjectPool<SessionKey, Session> sessionPool;
58+
private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
5959
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
6060
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
6161
private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
6262
private final AtomicBoolean closed = new AtomicBoolean();
6363

64-
private MessageProducer producer;
65-
private TopicPublisher publisher;
66-
private QueueSender sender;
67-
68-
private Session session;
64+
private SessionHolder sessionHolder;
6965
private boolean transactional = true;
7066
private boolean ignoreClose;
7167
private boolean isXa;
7268
private boolean useAnonymousProducers = true;
7369

74-
public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
70+
public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
7571
this.key = key;
76-
this.session = session;
72+
this.sessionHolder = sessionHolder;
7773
this.sessionPool = sessionPool;
7874
this.transactional = transactional;
7975
this.useAnonymousProducers = anonymous;
@@ -140,29 +136,29 @@ public void close() throws JMSException {
140136
if (invalidate) {
141137
// lets close the session and not put the session back into the pool
142138
// instead invalidate it so the pool can create a new one on demand.
143-
if (session != null) {
139+
if (sessionHolder != null) {
144140
try {
145-
session.close();
141+
sessionHolder.close();
146142
} catch (JMSException e1) {
147143
LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
148144
}
149145
}
150146
try {
151-
sessionPool.invalidateObject(key, session);
147+
sessionPool.invalidateObject(key, sessionHolder);
152148
} catch (Exception e) {
153149
LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
154150
}
155151
} else {
156152
try {
157-
sessionPool.returnObject(key, session);
153+
sessionPool.returnObject(key, sessionHolder);
158154
} catch (Exception e) {
159155
javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
160156
illegalStateException.initCause(e);
161157
throw illegalStateException;
162158
}
163159
}
164160

165-
session = null;
161+
sessionHolder = null;
166162
}
167163
}
168164

@@ -276,9 +272,12 @@ public void rollback() throws JMSException {
276272

277273
@Override
278274
public XAResource getXAResource() {
279-
if (session instanceof XASession) {
280-
return ((XASession) session).getXAResource();
275+
SessionHolder session = safeGetSessionHolder();
276+
277+
if (session.getSession() instanceof XASession) {
278+
return ((XASession) session.getSession()).getXAResource();
281279
}
280+
282281
return null;
283282
}
284283

@@ -289,8 +288,9 @@ public Session getSession() {
289288

290289
@Override
291290
public void run() {
291+
SessionHolder session = safeGetSessionHolder();
292292
if (session != null) {
293-
session.run();
293+
session.getSession().run();
294294
}
295295
}
296296

@@ -379,10 +379,7 @@ public TopicPublisher createPublisher(Topic topic) throws JMSException {
379379
}
380380

381381
public Session getInternalSession() throws IllegalStateException {
382-
if (session == null) {
383-
throw new IllegalStateException("The session has already been closed");
384-
}
385-
return session;
382+
return safeGetSessionHolder().getSession();
386383
}
387384

388385
public MessageProducer getMessageProducer() throws JMSException {
@@ -393,16 +390,7 @@ public MessageProducer getMessageProducer(Destination destination) throws JMSExc
393390
MessageProducer result = null;
394391

395392
if (useAnonymousProducers) {
396-
if (producer == null) {
397-
// Don't allow for duplicate anonymous producers.
398-
synchronized (this) {
399-
if (producer == null) {
400-
producer = getInternalSession().createProducer(null);
401-
}
402-
}
403-
}
404-
405-
result = producer;
393+
result = safeGetSessionHolder().getOrCreateProducer();
406394
} else {
407395
result = getInternalSession().createProducer(destination);
408396
}
@@ -418,16 +406,7 @@ public QueueSender getQueueSender(Queue destination) throws JMSException {
418406
QueueSender result = null;
419407

420408
if (useAnonymousProducers) {
421-
if (sender == null) {
422-
// Don't allow for duplicate anonymous producers.
423-
synchronized (this) {
424-
if (sender == null) {
425-
sender = ((QueueSession) getInternalSession()).createSender(null);
426-
}
427-
}
428-
}
429-
430-
result = sender;
409+
result = safeGetSessionHolder().getOrCreateSender();
431410
} else {
432411
result = ((QueueSession) getInternalSession()).createSender(destination);
433412
}
@@ -443,16 +422,7 @@ public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
443422
TopicPublisher result = null;
444423

445424
if (useAnonymousProducers) {
446-
if (publisher == null) {
447-
// Don't allow for duplicate anonymous producers.
448-
synchronized (this) {
449-
if (publisher == null) {
450-
publisher = ((TopicSession) getInternalSession()).createPublisher(null);
451-
}
452-
}
453-
}
454-
455-
result = publisher;
425+
result = safeGetSessionHolder().getOrCreatePublisher();
456426
} else {
457427
result = ((TopicSession) getInternalSession()).createPublisher(destination);
458428
}
@@ -489,7 +459,7 @@ public void setIsXa(boolean isXa) {
489459

490460
@Override
491461
public String toString() {
492-
return "PooledSession { " + session + " }";
462+
return "PooledSession { " + safeGetSessionHolder() + " }";
493463
}
494464

495465
/**
@@ -505,4 +475,13 @@ public String toString() {
505475
protected void onConsumerClose(MessageConsumer consumer) {
506476
consumers.remove(consumer);
507477
}
478+
479+
private SessionHolder safeGetSessionHolder() {
480+
SessionHolder sessionHolder = this.sessionHolder;
481+
if (sessionHolder == null) {
482+
throw new IllegalStateException("The session has already been closed");
483+
}
484+
485+
return sessionHolder;
486+
}
508487
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.jms.pool;
18+
19+
import javax.jms.JMSException;
20+
import javax.jms.MessageProducer;
21+
import javax.jms.QueueSender;
22+
import javax.jms.QueueSession;
23+
import javax.jms.Session;
24+
import javax.jms.TopicPublisher;
25+
import javax.jms.TopicSession;
26+
27+
/**
28+
* Used to store a pooled session instance and any resources that can
29+
* be left open and carried along with the pooled instance such as the
30+
* anonymous producer used for all MessageProducer instances created
31+
* from this pooled session when enabled.
32+
*/
33+
public class SessionHolder {
34+
35+
private final Session session;
36+
private MessageProducer producer;
37+
private TopicPublisher publisher;
38+
private QueueSender sender;
39+
40+
public SessionHolder(Session session) {
41+
this.session = session;
42+
}
43+
44+
public void close() throws JMSException {
45+
try {
46+
session.close();
47+
} finally {
48+
producer = null;
49+
publisher = null;
50+
sender = null;
51+
}
52+
}
53+
54+
public Session getSession() {
55+
return session;
56+
}
57+
58+
public MessageProducer getOrCreateProducer() throws JMSException {
59+
if (producer == null) {
60+
synchronized (this) {
61+
if (producer == null) {
62+
producer = session.createProducer(null);
63+
}
64+
}
65+
}
66+
67+
return producer;
68+
}
69+
70+
public TopicPublisher getOrCreatePublisher() throws JMSException {
71+
if (publisher == null) {
72+
synchronized (this) {
73+
if (publisher == null) {
74+
publisher = ((TopicSession) session).createPublisher(null);
75+
}
76+
}
77+
}
78+
79+
return publisher;
80+
}
81+
82+
public QueueSender getOrCreateSender() throws JMSException {
83+
if (sender == null) {
84+
synchronized (this) {
85+
if (sender == null) {
86+
sender = ((QueueSession) session).createSender(null);
87+
}
88+
}
89+
}
90+
91+
return sender;
92+
}
93+
94+
@Override
95+
public String toString() {
96+
return session.toString();
97+
}
98+
}

0 commit comments

Comments
 (0)