diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index 15bdd33d89..2be4a6f090 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -67,6 +67,11 @@ public class ConnectionPool implements ExceptionListener { final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); poolConfig.setJmxEnabled(false); this.connection = wrap(connection); + try { + this.connection.setExceptionListener(this); + } catch (JMSException ex) { + LOG.warn("Could not set exception listener on create of ConnectionPool"); + } // Create our internal Pool of session instances. this.sessionPool = new GenericKeyedObjectPool( @@ -79,7 +84,7 @@ public class ConnectionPool implements ExceptionListener { @Override public void destroyObject(SessionKey sessionKey, PooledObject pooledObject) throws Exception { - ((SessionHolder)pooledObject.getObject()).close(); + pooledObject.getObject().close(); } @Override @@ -357,26 +362,21 @@ public class ConnectionPool implements ExceptionListener { */ public void setReconnectOnException(boolean reconnectOnException) { this.reconnectOnException = reconnectOnException; - try { - if (isReconnectOnException()) { - if (connection.getExceptionListener() != null) { - parentExceptionListener = connection.getExceptionListener(); - } - connection.setExceptionListener(this); - } else { - if (parentExceptionListener != null) { - connection.setExceptionListener(parentExceptionListener); - } - parentExceptionListener = null; - } - } catch (JMSException jmse) { - LOG.warn("Cannot set reconnect exception listener", jmse); - } + } + + ExceptionListener getParentExceptionListener() { + return parentExceptionListener; + } + + void setParentExceptionListener(ExceptionListener parentExceptionListener) { + this.parentExceptionListener = parentExceptionListener; } @Override public void onException(JMSException exception) { - close(); + if (isReconnectOnException()) { + close(); + } if (parentExceptionListener != null) { parentExceptionListener.onException(exception); } diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java index b7b56ba14b..111e730dfb 100755 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java @@ -122,7 +122,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole @Override public ExceptionListener getExceptionListener() throws JMSException { - return getConnection().getExceptionListener(); + return pool.getParentExceptionListener(); } @Override @@ -132,7 +132,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole @Override public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { - getConnection().setExceptionListener(exceptionListener); + pool.setParentExceptionListener(exceptionListener); } @Override diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index f507cda940..839d66845b 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory; * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to * configure the idle eviction thread to run. - * */ public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); @@ -106,9 +105,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti connection.setUseAnonymousProducers(isUseAnonymousProducers()); connection.setReconnectOnException(isReconnectOnException()); - if (LOG.isTraceEnabled()) { - LOG.trace("Created new connection: {}", connection); - } + LOG.trace("Created new connection: {}", connection); PooledConnectionFactory.this.mostRecentlyCreated.set(connection); @@ -119,9 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public void destroyObject(ConnectionKey connectionKey, PooledObject pooledObject) throws Exception { ConnectionPool connection = pooledObject.getObject(); try { - if (LOG.isTraceEnabled()) { - LOG.trace("Destroying connection: {}", connection); - } + LOG.trace("Destroying connection: {}", connection); connection.close(); } catch (Exception e) { LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e); @@ -132,10 +127,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public boolean validateObject(ConnectionKey connectionKey, PooledObject pooledObject) { ConnectionPool connection = pooledObject.getObject(); if (connection != null && connection.expiredCheck()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Connection has expired: {} and will be destroyed", connection); - } - + LOG.trace("Connection has expired: {} and will be destroyed", connection); return false; } @@ -305,7 +297,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti public void stop() { if (stopped.compareAndSet(false, true)) { LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", - connectionsPool != null ? connectionsPool.getNumActive() : 0); + connectionsPool != null ? connectionsPool.getNumActive() : 0); try { if (connectionsPool != null) { connectionsPool.close(); @@ -322,7 +314,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti * are in use be client's will be closed. */ public void clear() { - if (stopped.get()) { return; } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java new file mode 100644 index 0000000000..503cc0b69c --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.jms.pool; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.Before; +import org.junit.Test; + +public class PooledConnectionFailoverTest extends JmsPoolTestSupport { + + protected ActiveMQConnectionFactory directConnFact; + protected PooledConnectionFactory pooledConnFact; + + @Override + @Before + public void setUp() throws java.lang.Exception { + super.setUp(); + + String connectionURI = createBroker(); + + // Create the ActiveMQConnectionFactory and the PooledConnectionFactory. + directConnFact = new ActiveMQConnectionFactory(connectionURI); + pooledConnFact = new PooledConnectionFactory(); + pooledConnFact.setConnectionFactory(directConnFact); + pooledConnFact.setMaxConnections(1); + pooledConnFact.setReconnectOnException(true); + } + + @Test + public void testConnectionFailures() throws Exception { + + final CountDownLatch failed = new CountDownLatch(1); + + Connection connection = pooledConnFact.createConnection(); + LOG.info("Fetched new connection from the pool: {}", connection); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.info("Pooled Connection failed"); + failed.countDown(); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getTestName()); + MessageProducer producer = session.createProducer(queue); + + brokerService.stop(); + + assertTrue(failed.await(15, TimeUnit.SECONDS)); + + createBroker(); + + try { + producer.send(session.createMessage()); + fail("Should be disconnected"); + } catch (JMSException ex) { + LOG.info("Producer failed as expected: {}", ex.getMessage()); + } + + Connection connection2 = pooledConnFact.createConnection(); + assertNotSame(connection, connection2); + LOG.info("Fetched new connection from the pool: {}", connection2); + session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection2.close(); + + pooledConnFact.stop(); + } + + private String createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setBrokerName("PooledConnectionSessionCleanupTestBroker"); + brokerService.setUseJmx(true); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.setSchedulerSupport(false); + brokerService.setAdvisorySupport(false); + TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:61626"); + brokerService.start(); + brokerService.waitUntilStarted(); + + return "failover:(" + connector.getPublishableConnectString() + ")?maxReconnectAttempts=5"; + } +} diff --git a/activemq-jms-pool/src/test/resources/log4j.properties b/activemq-jms-pool/src/test/resources/log4j.properties index b42af1ad63..2543c16e29 100755 --- a/activemq-jms-pool/src/test/resources/log4j.properties +++ b/activemq-jms-pool/src/test/resources/log4j.properties @@ -5,9 +5,9 @@ ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at -## +## ## http://www.apache.org/licenses/LICENSE-2.0 -## +## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,7 +23,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.spring=WARN #log4j.logger.org.apache.activemq.usecases=DEBUG #log4j.logger.org.apache.activemq.broker.region=DEBUG -log4j.logger.org.apache.activemq.pool=DEBUG +log4j.logger.org.apache.activemq.jms.pool=DEBUG # CONSOLE appender not used by default log4j.appender.stdout=org.apache.log4j.ConsoleAppender