Fixed session in the pool losing their reference to the anonymous
producer created when useAnonymousProducers is true.  The anonymous
producer stays live for the life of the pooled session.

Also added some synchronization safety to some methods that could get
into NPE trouble.
This commit is contained in:
Timothy Bish 2015-02-05 17:50:43 -05:00
parent 9f0ab46e29
commit f91abd3d46
5 changed files with 193 additions and 70 deletions

View File

@ -21,8 +21,13 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.*;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@ -51,7 +56,7 @@ public class ConnectionPool implements ExceptionListener {
private boolean useAnonymousProducers = true;
private final AtomicBoolean started = new AtomicBoolean(false);
private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
private boolean reconnectOnException;
private ExceptionListener parentExceptionListener;
@ -61,29 +66,29 @@ public class ConnectionPool implements ExceptionListener {
this.connection = wrap(connection);
// Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
new KeyedPoolableObjectFactory<SessionKey, Session>() {
this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
@Override
public void activateObject(SessionKey key, Session session) throws Exception {
public void activateObject(SessionKey key, SessionHolder session) throws Exception {
}
@Override
public void destroyObject(SessionKey key, Session session) throws Exception {
public void destroyObject(SessionKey key, SessionHolder session) throws Exception {
session.close();
}
@Override
public Session makeObject(SessionKey key) throws Exception {
return makeSession(key);
public SessionHolder makeObject(SessionKey key) throws Exception {
return new SessionHolder(makeSession(key));
}
@Override
public void passivateObject(SessionKey key, Session session) throws Exception {
public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
}
@Override
public boolean validateObject(SessionKey key, Session session) {
public boolean validateObject(SessionKey key, SessionHolder session) {
return true;
}
}

View File

@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.IllegalStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
// Store the session so we can close the sessions that this PooledConnection
// created in order to ensure that consumers etc are closed per the JMS contract.

View File

@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
private final SessionKey key;
private final KeyedObjectPool<SessionKey, Session> sessionPool;
private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
private final AtomicBoolean closed = new AtomicBoolean();
private MessageProducer producer;
private TopicPublisher publisher;
private QueueSender sender;
private Session session;
private SessionHolder sessionHolder;
private boolean transactional = true;
private boolean ignoreClose;
private boolean isXa;
private boolean useAnonymousProducers = true;
public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
this.key = key;
this.session = session;
this.sessionHolder = sessionHolder;
this.sessionPool = sessionPool;
this.transactional = transactional;
this.useAnonymousProducers = anonymous;
@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
if (invalidate) {
// lets close the session and not put the session back into the pool
// instead invalidate it so the pool can create a new one on demand.
if (session != null) {
if (sessionHolder != null) {
try {
session.close();
sessionHolder.close();
} catch (JMSException e1) {
LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
}
}
try {
sessionPool.invalidateObject(key, session);
sessionPool.invalidateObject(key, sessionHolder);
} catch (Exception e) {
LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
}
} else {
try {
sessionPool.returnObject(key, session);
sessionPool.returnObject(key, sessionHolder);
} catch (Exception e) {
javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
illegalStateException.initCause(e);
@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
}
}
session = null;
sessionHolder = null;
}
}
@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public XAResource getXAResource() {
if (session instanceof XASession) {
return ((XASession) session).getXAResource();
SessionHolder session = safeGetSessionHolder();
if (session.getSession() instanceof XASession) {
return ((XASession) session.getSession()).getXAResource();
}
return null;
}
@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public void run() {
SessionHolder session = safeGetSessionHolder();
if (session != null) {
session.run();
session.getSession().run();
}
}
@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
}
public Session getInternalSession() throws IllegalStateException {
if (session == null) {
throw new IllegalStateException("The session has already been closed");
}
return session;
return safeGetSessionHolder().getSession();
}
public MessageProducer getMessageProducer() throws JMSException {
@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
MessageProducer result = null;
if (useAnonymousProducers) {
if (producer == null) {
// Don't allow for duplicate anonymous producers.
synchronized (this) {
if (producer == null) {
producer = getInternalSession().createProducer(null);
}
}
}
result = producer;
result = safeGetSessionHolder().getOrCreateProducer();
} else {
result = getInternalSession().createProducer(destination);
}
@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
QueueSender result = null;
if (useAnonymousProducers) {
if (sender == null) {
// Don't allow for duplicate anonymous producers.
synchronized (this) {
if (sender == null) {
sender = ((QueueSession) getInternalSession()).createSender(null);
}
}
}
result = sender;
result = safeGetSessionHolder().getOrCreateSender();
} else {
result = ((QueueSession) getInternalSession()).createSender(destination);
}
@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
TopicPublisher result = null;
if (useAnonymousProducers) {
if (publisher == null) {
// Don't allow for duplicate anonymous producers.
synchronized (this) {
if (publisher == null) {
publisher = ((TopicSession) getInternalSession()).createPublisher(null);
}
}
}
result = publisher;
result = safeGetSessionHolder().getOrCreatePublisher();
} else {
result = ((TopicSession) getInternalSession()).createPublisher(destination);
}
@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public String toString() {
return "PooledSession { " + session + " }";
return "PooledSession { " + safeGetSessionHolder() + " }";
}
/**
@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
protected void onConsumerClose(MessageConsumer consumer) {
consumers.remove(consumer);
}
private SessionHolder safeGetSessionHolder() {
SessionHolder sessionHolder = this.sessionHolder;
if (sessionHolder == null) {
throw new IllegalStateException("The session has already been closed");
}
return sessionHolder;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
/**
* Used to store a pooled session instance and any resources that can
* be left open and carried along with the pooled instance such as the
* anonymous producer used for all MessageProducer instances created
* from this pooled session when enabled.
*/
public class SessionHolder {
private final Session session;
private MessageProducer producer;
private TopicPublisher publisher;
private QueueSender sender;
public SessionHolder(Session session) {
this.session = session;
}
public void close() throws JMSException {
try {
session.close();
} finally {
producer = null;
publisher = null;
sender = null;
}
}
public Session getSession() {
return session;
}
public MessageProducer getOrCreateProducer() throws JMSException {
if (producer == null) {
synchronized (this) {
if (producer == null) {
producer = session.createProducer(null);
}
}
}
return producer;
}
public TopicPublisher getOrCreatePublisher() throws JMSException {
if (publisher == null) {
synchronized (this) {
if (publisher == null) {
publisher = ((TopicSession) session).createPublisher(null);
}
}
}
return publisher;
}
public QueueSender getOrCreateSender() throws JMSException {
if (sender == null) {
synchronized (this) {
if (sender == null) {
sender = ((QueueSession) session).createSender(null);
}
}
}
return sender;
}
@Override
public String toString() {
return session.toString();
}
}

View File

@ -17,9 +17,13 @@
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;
@ -44,7 +48,8 @@ public class PooledSessionTest {
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setUseJmx(true);
broker.getManagementContext().setCreateMBeanServer(false);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
connectionUri = connector.getPublishableConnectString();
@ -62,7 +67,7 @@ public class PooledSessionTest {
broker = null;
}
@Test
@Test(timeout = 60000)
public void testPooledSessionStats() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
@ -73,9 +78,11 @@ public class PooledSessionTest {
assertEquals(0, connection.getNumActiveSessions());
assertEquals(1, connection.getNumtIdleSessions());
assertEquals(1, connection.getNumSessions());
connection.close();
}
@Test
@Test(timeout = 60000)
public void testMessageProducersAreAllTheSame() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -87,9 +94,11 @@ public class PooledSessionTest {
PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
connection.close();
}
@Test
@Test(timeout = 60000)
public void testThrowsWhenDifferentDestinationGiven() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -110,9 +119,11 @@ public class PooledSessionTest {
fail("Should only be able to send to queue 1");
} catch (Exception ex) {
}
connection.close();
}
@Test
@Test(timeout = 60000)
public void testCreateTopicPublisher() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@ -124,9 +135,10 @@ public class PooledSessionTest {
PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
connection.close();
}
@Test
@Test(timeout = 60000)
public void testQueueSender() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@ -138,5 +150,34 @@ public class PooledSessionTest {
PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
connection.close();
}
@Test(timeout = 60000)
public void testRepeatedCreateSessionProducerResultsInSame() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
assertTrue(pooledFactory.isUseAnonymousProducers());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("test-topic");
PooledProducer producer = (PooledProducer) session.createProducer(destination);
MessageProducer original = producer.getMessageProducer();
assertNotNull(original);
session.close();
assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
for (int i = 0; i < 20; ++i) {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = (PooledProducer) session.createProducer(destination);
assertSame(original, producer.getMessageProducer());
session.close();
}
assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
connection.close();
pooledFactory.clear();
}
}