mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5534 PooledConnectioFactory added reconnection support but can break if the holder of the connection adds their own ExceptionListener as the PooledConnection doesn't protect the internal ExceptionListener from replacement which leads to cases where the loaned Connection is not automatically closed so that the next create returns the same failed connection.
This commit is contained in:
parent
8a30026e82
commit
b65c0d1be4
|
@ -67,6 +67,11 @@ public class ConnectionPool implements ExceptionListener {
|
|||
final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
|
||||
poolConfig.setJmxEnabled(false);
|
||||
this.connection = wrap(connection);
|
||||
try {
|
||||
this.connection.setExceptionListener(this);
|
||||
} catch (JMSException ex) {
|
||||
LOG.warn("Could not set exception listener on create of ConnectionPool");
|
||||
}
|
||||
|
||||
// Create our internal Pool of session instances.
|
||||
this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
|
||||
|
@ -79,7 +84,7 @@ public class ConnectionPool implements ExceptionListener {
|
|||
|
||||
@Override
|
||||
public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
|
||||
((SessionHolder)pooledObject.getObject()).close();
|
||||
pooledObject.getObject().close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -357,26 +362,21 @@ public class ConnectionPool implements ExceptionListener {
|
|||
*/
|
||||
public void setReconnectOnException(boolean reconnectOnException) {
|
||||
this.reconnectOnException = reconnectOnException;
|
||||
try {
|
||||
if (isReconnectOnException()) {
|
||||
if (connection.getExceptionListener() != null) {
|
||||
parentExceptionListener = connection.getExceptionListener();
|
||||
}
|
||||
connection.setExceptionListener(this);
|
||||
} else {
|
||||
if (parentExceptionListener != null) {
|
||||
connection.setExceptionListener(parentExceptionListener);
|
||||
}
|
||||
parentExceptionListener = null;
|
||||
}
|
||||
} catch (JMSException jmse) {
|
||||
LOG.warn("Cannot set reconnect exception listener", jmse);
|
||||
}
|
||||
}
|
||||
|
||||
ExceptionListener getParentExceptionListener() {
|
||||
return parentExceptionListener;
|
||||
}
|
||||
|
||||
void setParentExceptionListener(ExceptionListener parentExceptionListener) {
|
||||
this.parentExceptionListener = parentExceptionListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
close();
|
||||
if (isReconnectOnException()) {
|
||||
close();
|
||||
}
|
||||
if (parentExceptionListener != null) {
|
||||
parentExceptionListener.onException(exception);
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
|
|||
|
||||
@Override
|
||||
public ExceptionListener getExceptionListener() throws JMSException {
|
||||
return getConnection().getExceptionListener();
|
||||
return pool.getParentExceptionListener();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,7 +132,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
|
|||
|
||||
@Override
|
||||
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
|
||||
getConnection().setExceptionListener(exceptionListener);
|
||||
pool.setParentExceptionListener(exceptionListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
|
|||
* eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By
|
||||
* default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
|
||||
* configure the idle eviction thread to run.
|
||||
*
|
||||
*/
|
||||
public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
|
||||
|
@ -106,9 +105,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
|||
connection.setUseAnonymousProducers(isUseAnonymousProducers());
|
||||
connection.setReconnectOnException(isReconnectOnException());
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Created new connection: {}", connection);
|
||||
}
|
||||
LOG.trace("Created new connection: {}", connection);
|
||||
|
||||
PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
|
||||
|
||||
|
@ -119,9 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
|||
public void destroyObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
|
||||
ConnectionPool connection = pooledObject.getObject();
|
||||
try {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Destroying connection: {}", connection);
|
||||
}
|
||||
LOG.trace("Destroying connection: {}", connection);
|
||||
connection.close();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
|
||||
|
@ -132,10 +127,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
|||
public boolean validateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) {
|
||||
ConnectionPool connection = pooledObject.getObject();
|
||||
if (connection != null && connection.expiredCheck()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Connection has expired: {} and will be destroyed", connection);
|
||||
}
|
||||
|
||||
LOG.trace("Connection has expired: {} and will be destroyed", connection);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -305,7 +297,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
|||
public void stop() {
|
||||
if (stopped.compareAndSet(false, true)) {
|
||||
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
|
||||
connectionsPool != null ? connectionsPool.getNumActive() : 0);
|
||||
connectionsPool != null ? connectionsPool.getNumActive() : 0);
|
||||
try {
|
||||
if (connectionsPool != null) {
|
||||
connectionsPool.close();
|
||||
|
@ -322,7 +314,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
|
|||
* are in use be client's will be closed.
|
||||
*/
|
||||
public void clear() {
|
||||
|
||||
if (stopped.get()) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.jms.pool;
|
||||
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class PooledConnectionFailoverTest extends JmsPoolTestSupport {
|
||||
|
||||
protected ActiveMQConnectionFactory directConnFact;
|
||||
protected PooledConnectionFactory pooledConnFact;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws java.lang.Exception {
|
||||
super.setUp();
|
||||
|
||||
String connectionURI = createBroker();
|
||||
|
||||
// Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
|
||||
directConnFact = new ActiveMQConnectionFactory(connectionURI);
|
||||
pooledConnFact = new PooledConnectionFactory();
|
||||
pooledConnFact.setConnectionFactory(directConnFact);
|
||||
pooledConnFact.setMaxConnections(1);
|
||||
pooledConnFact.setReconnectOnException(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectionFailures() throws Exception {
|
||||
|
||||
final CountDownLatch failed = new CountDownLatch(1);
|
||||
|
||||
Connection connection = pooledConnFact.createConnection();
|
||||
LOG.info("Fetched new connection from the pool: {}", connection);
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.info("Pooled Connection failed");
|
||||
failed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(getTestName());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
brokerService.stop();
|
||||
|
||||
assertTrue(failed.await(15, TimeUnit.SECONDS));
|
||||
|
||||
createBroker();
|
||||
|
||||
try {
|
||||
producer.send(session.createMessage());
|
||||
fail("Should be disconnected");
|
||||
} catch (JMSException ex) {
|
||||
LOG.info("Producer failed as expected: {}", ex.getMessage());
|
||||
}
|
||||
|
||||
Connection connection2 = pooledConnFact.createConnection();
|
||||
assertNotSame(connection, connection2);
|
||||
LOG.info("Fetched new connection from the pool: {}", connection2);
|
||||
session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
connection2.close();
|
||||
|
||||
pooledConnFact.stop();
|
||||
}
|
||||
|
||||
private String createBroker() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setBrokerName("PooledConnectionSessionCleanupTestBroker");
|
||||
brokerService.setUseJmx(true);
|
||||
brokerService.getManagementContext().setCreateConnector(false);
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setSchedulerSupport(false);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:61626");
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
|
||||
return "failover:(" + connector.getPublishableConnectString() + ")?maxReconnectAttempts=5";
|
||||
}
|
||||
}
|
|
@ -5,9 +5,9 @@
|
|||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -23,7 +23,7 @@ log4j.rootLogger=INFO, out, stdout
|
|||
log4j.logger.org.apache.activemq.spring=WARN
|
||||
#log4j.logger.org.apache.activemq.usecases=DEBUG
|
||||
#log4j.logger.org.apache.activemq.broker.region=DEBUG
|
||||
log4j.logger.org.apache.activemq.pool=DEBUG
|
||||
log4j.logger.org.apache.activemq.jms.pool=DEBUG
|
||||
|
||||
# CONSOLE appender not used by default
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
|
|
Loading…
Reference in New Issue