Ensure that connections are returned when created and then handed out as
FIFO after that.
This commit is contained in:
Timothy Bish 2014-06-13 10:30:39 -04:00
parent f395c70608
commit 878e3a16ca
2 changed files with 32 additions and 31 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.jms.pool; package org.apache.activemq.jms.pool;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -73,6 +74,9 @@ public class PooledConnectionFactory implements ConnectionFactory {
private boolean createConnectionOnStartup = true; private boolean createConnectionOnStartup = true;
private boolean useAnonymousProducers = true; private boolean useAnonymousProducers = true;
// Temporary value used to always fetch the result of makeObject.
private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null);
public void initConnectionsPool() { public void initConnectionsPool() {
if (this.connectionsPool == null) { if (this.connectionsPool == null) {
this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>( this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
@ -112,6 +116,8 @@ public class PooledConnectionFactory implements ConnectionFactory {
LOG.trace("Created new connection: {}", connection); LOG.trace("Created new connection: {}", connection);
} }
PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
return connection; return connection;
} }
@ -135,6 +141,7 @@ public class PooledConnectionFactory implements ConnectionFactory {
// Set max idle (not max active) since our connections always idle in the pool. // Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdle(1); this.connectionsPool.setMaxIdle(1);
this.connectionsPool.setLifo(false);
// We always want our validate method to control when idle objects are evicted. // We always want our validate method to control when idle objects are evicted.
this.connectionsPool.setTestOnBorrow(true); this.connectionsPool.setTestOnBorrow(true);
@ -195,45 +202,41 @@ public class PooledConnectionFactory implements ConnectionFactory {
// will create a new one to meet the demand. // will create a new one to meet the demand.
if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) { if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
try { try {
// we want borrowObject to return the one we added.
connectionsPool.setLifo(true);
connectionsPool.addObject(key); connectionsPool.addObject(key);
connection = mostRecentlyCreated.getAndSet(null);
connection.incrementReferenceCount();
} catch (Exception e) { } catch (Exception e) {
throw createJmsException("Error while attempting to add new Connection to the pool", e); throw createJmsException("Error while attempting to add new Connection to the pool", e);
} }
} else { } else {
// now we want the oldest one in the pool. try {
connectionsPool.setLifo(false); // We can race against other threads returning the connection when there is an
} // expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
try { // Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
// We can race against other threads returning the connection when there is an connection = null;
// expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
} }
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
} }
} catch (Exception e) {
throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
} }
} catch (Exception e) {
throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
}
try { try {
connectionsPool.returnObject(key, connection); connectionsPool.returnObject(key, connection);
} catch (Exception e) { } catch (Exception e) {
throw createJmsException("Error when returning connection to the pool", e); throw createJmsException("Error when returning connection to the pool", e);
}
} }
return newPooledConnection(connection); return newPooledConnection(connection);

View File

@ -40,7 +40,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
/** /**
@ -197,7 +196,6 @@ public class PooledConnectionFactoryTest {
doTestConcurrentCreateGetsUniqueConnection(false); doTestConcurrentCreateGetsUniqueConnection(false);
} }
@Ignore("something up - don't know why the start call to createConnection does not cause close - but that does not fix it either!")
@Test @Test
public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception { public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception {
doTestConcurrentCreateGetsUniqueConnection(true); doTestConcurrentCreateGetsUniqueConnection(true);