git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1465723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-04-08 18:58:11 +00:00
parent 585a966a9b
commit 858ab263d4
4 changed files with 116 additions and 6 deletions

View File

@ -32,6 +32,8 @@ import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Holds a real JMS connection along with the session pools associated with it.
@ -43,6 +45,8 @@ import org.apache.commons.pool.impl.GenericObjectPool;
*/
public class ConnectionPool {
private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
private ActiveMQConnection connection;
private int referenceCount;
private long lastUsed = System.currentTimeMillis();
@ -207,6 +211,9 @@ public class ConnectionPool {
* @return true if this connection has expired.
*/
public synchronized boolean expiredCheck() {
boolean expired = false;
if (connection == null) {
return true;
}
@ -214,25 +221,27 @@ public class ConnectionPool {
if (hasExpired || hasFailed) {
if (referenceCount == 0) {
close();
expired = true;
}
return true;
}
if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
hasExpired = true;
if (referenceCount == 0) {
close();
expired = true;
}
return true;
}
// Only set hasExpired here is no references, as a Connection with references is by
// definition not idle at this time.
if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
hasExpired = true;
close();
return true;
expired = true;
}
return false;
return expired;
}
public int getIdleTimeout() {

View File

@ -74,7 +74,6 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
this.pool.incrementReferenceCount();
}
/**

View File

@ -220,7 +220,25 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
try {
connection = connectionsPool.borrowObject(key);
// We can race against other threads returning the connection when there is an
// expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
}
}
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
}

View File

@ -0,0 +1,84 @@
package org.apache.activemq.pool.bugs;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ4441Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4441Test.class);
private BrokerService broker;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
broker.setUseJmx(false);
broker.start();
broker.waitUntilStarted();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Test(timeout=120000)
public void demo() throws JMSException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean(false);
final PooledConnectionFactory pooled = new PooledConnectionFactory("vm://localhost?create=false");
pooled.setMaxConnections(2);
pooled.setExpiryTimeout(10L);
pooled.start();
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
while (!done.get() && latch.getCount() > 0) {
try {
final PooledConnection pooledConnection = (PooledConnection) pooled.createConnection();
if (pooledConnection.getConnection() == null) {
LOG.info("Found broken connection.");
latch.countDown();
}
pooledConnection.close();
} catch (JMSException e) {
LOG.warn("Caught Exception", e);
}
}
}
});
}
for (Thread thread : threads) {
thread.start();
}
if (latch.await(1, TimeUnit.MINUTES)) {
fail("A thread obtained broken connection");
}
done.set(true);
for (Thread thread : threads) {
thread.join();
}
}
}