[AMQ-7131] Add connectionTimeout to avoid starvation

This commit is contained in:
Benjamin Graf 2019-01-10 20:33:10 +01:00 committed by jbonofre
parent cc4a69913b
commit 92dec52e1c
2 changed files with 61 additions and 15 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.jms.pool;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -74,6 +75,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
private int connectionTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
private long blockIfSessionPoolIsFullTimeout = -1L;
private long expiryTimeout = 0l;
@ -144,6 +146,9 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
}, poolConfig);
// Set max wait time to control borrow from pool.
this.connectionsPool.setMaxWaitMillis(getConnectionTimeout());
// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdlePerKey(1);
this.connectionsPool.setLifo(false);
@ -232,16 +237,24 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
try {
connection = connectionsPool.borrowObject(key);
} catch (NoSuchElementException ex) {
if (!"Timeout waiting for idle object".equals(ex.getMessage())) {
throw ex;
}
}
if (connection != null) {
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
}
}
}
} catch (Exception e) {
@ -420,6 +433,27 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
this.idleTimeout = idleTimeout;
}
/**
* Gets the connection timeout value. The maximum time waited to get a Connection from the pool.
* The default value is 30 seconds.
*
* @return connection timeout value (milliseconds)
*/
public int getConnectionTimeout() {
return connectionTimeout;
}
/**
* Sets the connection timeout value for getting Connections from this pool in Milliseconds,
* defaults to 30 seconds.
*
* @param connectionTimeout
* The maximum time to wait for getting a pooled Connection.
*/
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
/**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery

View File

@ -16,13 +16,6 @@
*/
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -46,6 +39,8 @@ import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount of
* sessions is being reached.
@ -67,6 +62,23 @@ public class PooledConnectionFactoryTest extends JmsPoolTestSupport {
pcf.stop();
}
@Test(timeout = 120000)
public void testConnectionTimeout() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setConnectionTimeout(100);
PooledConnection connection = (PooledConnection) cf.createConnection();
assertEquals(1, cf.getNumConnections());
// wait for the connection timeout
Thread.sleep(300);
connection = (PooledConnection) cf.createConnection();
assertEquals(1, cf.getNumConnections());
}
@Test(timeout = 60000)
public void testClearAllConnections() throws Exception {