From b65c0d1be4b0812229d3f166e50d963766856c53 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 20 Apr 2016 09:48:06 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5510 https://issues.apache.org/jira/browse/AMQ-5534 PooledConnectioFactory added reconnection support but can break if the holder of the connection adds their own ExceptionListener as the PooledConnection doesn't protect the internal ExceptionListener from replacement which leads to cases where the loaned Connection is not automatically closed so that the next create returns the same failed connection. --- .../activemq/jms/pool/ConnectionPool.java | 34 ++--- .../activemq/jms/pool/PooledConnection.java | 4 +- .../jms/pool/PooledConnectionFactory.java | 19 +-- .../pool/PooledConnectionFailoverTest.java | 117 ++++++++++++++++++ .../src/test/resources/log4j.properties | 6 +- 5 files changed, 144 insertions(+), 36 deletions(-) create mode 100644 activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index 15bdd33d89..2be4a6f090 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -67,6 +67,11 @@ public class ConnectionPool implements ExceptionListener { final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); poolConfig.setJmxEnabled(false); this.connection = wrap(connection); + try { + this.connection.setExceptionListener(this); + } catch (JMSException ex) { + LOG.warn("Could not set exception listener on create of ConnectionPool"); + } // Create our internal Pool of session instances. this.sessionPool = new GenericKeyedObjectPool( @@ -79,7 +84,7 @@ public class ConnectionPool implements ExceptionListener { @Override public void destroyObject(SessionKey sessionKey, PooledObject pooledObject) throws Exception { - ((SessionHolder)pooledObject.getObject()).close(); + pooledObject.getObject().close(); } @Override @@ -357,26 +362,21 @@ public class ConnectionPool implements ExceptionListener { */ public void setReconnectOnException(boolean reconnectOnException) { this.reconnectOnException = reconnectOnException; - try { - if (isReconnectOnException()) { - if (connection.getExceptionListener() != null) { - parentExceptionListener = connection.getExceptionListener(); - } - connection.setExceptionListener(this); - } else { - if (parentExceptionListener != null) { - connection.setExceptionListener(parentExceptionListener); - } - parentExceptionListener = null; - } - } catch (JMSException jmse) { - LOG.warn("Cannot set reconnect exception listener", jmse); - } + } + + ExceptionListener getParentExceptionListener() { + return parentExceptionListener; + } + + void setParentExceptionListener(ExceptionListener parentExceptionListener) { + this.parentExceptionListener = parentExceptionListener; } @Override public void onException(JMSException exception) { - close(); + if (isReconnectOnException()) { + close(); + } if (parentExceptionListener != null) { parentExceptionListener.onException(exception); } diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java index b7b56ba14b..111e730dfb 100755 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java @@ -122,7 +122,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole @Override public ExceptionListener getExceptionListener() throws JMSException { - return getConnection().getExceptionListener(); + return pool.getParentExceptionListener(); } @Override @@ -132,7 +132,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole @Override public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { - getConnection().setExceptionListener(exceptionListener); + pool.setParentExceptionListener(exceptionListener); } @Override diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index f507cda940..839d66845b 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory; * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to * configure the idle eviction thread to run. - * */ public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); @@ -106,9 +105,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti connection.setUseAnonymousProducers(isUseAnonymousProducers()); connection.setReconnectOnException(isReconnectOnException()); - if (LOG.isTraceEnabled()) { - LOG.trace("Created new connection: {}", connection); - } + LOG.trace("Created new connection: {}", connection); PooledConnectionFactory.this.mostRecentlyCreated.set(connection); @@ -119,9 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public void destroyObject(ConnectionKey connectionKey, PooledObject pooledObject) throws Exception { ConnectionPool connection = pooledObject.getObject(); try { - if (LOG.isTraceEnabled()) { - LOG.trace("Destroying connection: {}", connection); - } + LOG.trace("Destroying connection: {}", connection); connection.close(); } catch (Exception e) { LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e); @@ -132,10 +127,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public boolean validateObject(ConnectionKey connectionKey, PooledObject pooledObject) { ConnectionPool connection = pooledObject.getObject(); if (connection != null && connection.expiredCheck()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Connection has expired: {} and will be destroyed", connection); - } - + LOG.trace("Connection has expired: {} and will be destroyed", connection); return false; } @@ -305,7 +297,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public void stop() { if (stopped.compareAndSet(false, true)) { LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", - connectionsPool != null ? connectionsPool.getNumActive() : 0); + connectionsPool != null ? connectionsPool.getNumActive() : 0); try { if (connectionsPool != null) { connectionsPool.close(); @@ -322,7 +314,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti * are in use be client's will be closed. */ public void clear() { - if (stopped.get()) { return; } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java new file mode 100644 index 0000000000..503cc0b69c --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.jms.pool; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.Before; +import org.junit.Test; + +public class PooledConnectionFailoverTest extends JmsPoolTestSupport { + + protected ActiveMQConnectionFactory directConnFact; + protected PooledConnectionFactory pooledConnFact; + + @Override + @Before + public void setUp() throws java.lang.Exception { + super.setUp(); + + String connectionURI = createBroker(); + + // Create the ActiveMQConnectionFactory and the PooledConnectionFactory. + directConnFact = new ActiveMQConnectionFactory(connectionURI); + pooledConnFact = new PooledConnectionFactory(); + pooledConnFact.setConnectionFactory(directConnFact); + pooledConnFact.setMaxConnections(1); + pooledConnFact.setReconnectOnException(true); + } + + @Test + public void testConnectionFailures() throws Exception { + + final CountDownLatch failed = new CountDownLatch(1); + + Connection connection = pooledConnFact.createConnection(); + LOG.info("Fetched new connection from the pool: {}", connection); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.info("Pooled Connection failed"); + failed.countDown(); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getTestName()); + MessageProducer producer = session.createProducer(queue); + + brokerService.stop(); + + assertTrue(failed.await(15, TimeUnit.SECONDS)); + + createBroker(); + + try { + producer.send(session.createMessage()); + fail("Should be disconnected"); + } catch (JMSException ex) { + LOG.info("Producer failed as expected: {}", ex.getMessage()); + } + + Connection connection2 = pooledConnFact.createConnection(); + assertNotSame(connection, connection2); + LOG.info("Fetched new connection from the pool: {}", connection2); + session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection2.close(); + + pooledConnFact.stop(); + } + + private String createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setBrokerName("PooledConnectionSessionCleanupTestBroker"); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.setSchedulerSupport(false); + brokerService.setAdvisorySupport(false); + TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:61626"); + brokerService.start(); + brokerService.waitUntilStarted(); + + return "failover:(" + connector.getPublishableConnectString() + ")?maxReconnectAttempts=5"; + } +} diff --git a/activemq-jms-pool/src/test/resources/log4j.properties b/activemq-jms-pool/src/test/resources/log4j.properties index b42af1ad63..2543c16e29 100755 --- a/activemq-jms-pool/src/test/resources/log4j.properties +++ b/activemq-jms-pool/src/test/resources/log4j.properties @@ -5,9 +5,9 @@ ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at -## +## ## http://www.apache.org/licenses/LICENSE-2.0 -## +## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,7 +23,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.spring=WARN #log4j.logger.org.apache.activemq.usecases=DEBUG #log4j.logger.org.apache.activemq.broker.region=DEBUG -log4j.logger.org.apache.activemq.pool=DEBUG +log4j.logger.org.apache.activemq.jms.pool=DEBUG # CONSOLE appender not used by default log4j.appender.stdout=org.apache.log4j.ConsoleAppender