https://issues.apache.org/jira/browse/AMQ-5534

PooledConnectioFactory added reconnection support but can break if the
holder of the connection adds their own ExceptionListener as the
PooledConnection doesn't protect the internal ExceptionListener from
replacement which leads to cases where the loaned Connection is not
automatically closed so that the next create returns the same failed
connection.
(cherry picked from commit b65c0d1be4)
This commit is contained in:
Timothy Bish 2016-04-20 09:48:06 -04:00
parent ad07ee4612
commit 48f314ef5a
5 changed files with 144 additions and 36 deletions

View File

@ -67,6 +67,11 @@ public class ConnectionPool implements ExceptionListener {
final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
poolConfig.setJmxEnabled(false);
this.connection = wrap(connection);
try {
this.connection.setExceptionListener(this);
} catch (JMSException ex) {
LOG.warn("Could not set exception listener on create of ConnectionPool");
}
// Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
@ -79,7 +84,7 @@ public class ConnectionPool implements ExceptionListener {
@Override
public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
((SessionHolder)pooledObject.getObject()).close();
pooledObject.getObject().close();
}
@Override
@ -357,26 +362,21 @@ public class ConnectionPool implements ExceptionListener {
*/
public void setReconnectOnException(boolean reconnectOnException) {
this.reconnectOnException = reconnectOnException;
try {
if (isReconnectOnException()) {
if (connection.getExceptionListener() != null) {
parentExceptionListener = connection.getExceptionListener();
}
connection.setExceptionListener(this);
} else {
if (parentExceptionListener != null) {
connection.setExceptionListener(parentExceptionListener);
}
parentExceptionListener = null;
}
} catch (JMSException jmse) {
LOG.warn("Cannot set reconnect exception listener", jmse);
}
}
ExceptionListener getParentExceptionListener() {
return parentExceptionListener;
}
void setParentExceptionListener(ExceptionListener parentExceptionListener) {
this.parentExceptionListener = parentExceptionListener;
}
@Override
public void onException(JMSException exception) {
close();
if (isReconnectOnException()) {
close();
}
if (parentExceptionListener != null) {
parentExceptionListener.onException(exception);
}

View File

@ -122,7 +122,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
return pool.getParentExceptionListener();
}
@Override
@ -132,7 +132,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
@Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
getConnection().setExceptionListener(exceptionListener);
pool.setParentExceptionListener(exceptionListener);
}
@Override

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
* eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By
* default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
* configure the idle eviction thread to run.
*
*/
public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
@ -106,9 +105,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
connection.setUseAnonymousProducers(isUseAnonymousProducers());
connection.setReconnectOnException(isReconnectOnException());
if (LOG.isTraceEnabled()) {
LOG.trace("Created new connection: {}", connection);
}
LOG.trace("Created new connection: {}", connection);
PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
@ -119,9 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
public void destroyObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
ConnectionPool connection = pooledObject.getObject();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
LOG.trace("Destroying connection: {}", connection);
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
@ -132,10 +127,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
public boolean validateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) {
ConnectionPool connection = pooledObject.getObject();
if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {} and will be destroyed", connection);
}
LOG.trace("Connection has expired: {} and will be destroyed", connection);
return false;
}
@ -305,7 +297,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
connectionsPool != null ? connectionsPool.getNumActive() : 0);
connectionsPool != null ? connectionsPool.getNumActive() : 0);
try {
if (connectionsPool != null) {
connectionsPool.close();
@ -322,7 +314,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
* are in use be client's will be closed.
*/
public void clear() {
if (stopped.get()) {
return;
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.junit.Before;
import org.junit.Test;
public class PooledConnectionFailoverTest extends JmsPoolTestSupport {
protected ActiveMQConnectionFactory directConnFact;
protected PooledConnectionFactory pooledConnFact;
@Override
@Before
public void setUp() throws java.lang.Exception {
super.setUp();
String connectionURI = createBroker();
// Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
directConnFact = new ActiveMQConnectionFactory(connectionURI);
pooledConnFact = new PooledConnectionFactory();
pooledConnFact.setConnectionFactory(directConnFact);
pooledConnFact.setMaxConnections(1);
pooledConnFact.setReconnectOnException(true);
}
@Test
public void testConnectionFailures() throws Exception {
final CountDownLatch failed = new CountDownLatch(1);
Connection connection = pooledConnFact.createConnection();
LOG.info("Fetched new connection from the pool: {}", connection);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.info("Pooled Connection failed");
failed.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getTestName());
MessageProducer producer = session.createProducer(queue);
brokerService.stop();
assertTrue(failed.await(15, TimeUnit.SECONDS));
createBroker();
try {
producer.send(session.createMessage());
fail("Should be disconnected");
} catch (JMSException ex) {
LOG.info("Producer failed as expected: {}", ex.getMessage());
}
Connection connection2 = pooledConnFact.createConnection();
assertNotSame(connection, connection2);
LOG.info("Fetched new connection from the pool: {}", connection2);
session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.close();
pooledConnFact.stop();
}
private String createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setBrokerName("PooledConnectionSessionCleanupTestBroker");
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
brokerService.setPersistent(false);
brokerService.setSchedulerSupport(false);
brokerService.setAdvisorySupport(false);
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:61626");
brokerService.start();
brokerService.waitUntilStarted();
return "failover:(" + connector.getPublishableConnectString() + ")?maxReconnectAttempts=5";
}
}

View File

@ -5,9 +5,9 @@
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
##
## http://www.apache.org/licenses/LICENSE-2.0
##
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -23,7 +23,7 @@ log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN
#log4j.logger.org.apache.activemq.usecases=DEBUG
#log4j.logger.org.apache.activemq.broker.region=DEBUG
log4j.logger.org.apache.activemq.pool=DEBUG
log4j.logger.org.apache.activemq.jms.pool=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender