From 92dec52e1c1ecd26014fa62f5aad7072ecdbb265 Mon Sep 17 00:00:00 2001 From: Benjamin Graf Date: Thu, 10 Jan 2019 20:33:10 +0100 Subject: [PATCH] [AMQ-7131] Add connectionTimeout to avoid starvation --- .../jms/pool/PooledConnectionFactory.java | 50 ++++++++++++++++--- .../jms/pool/PooledConnectionFactoryTest.java | 26 +++++++--- 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 967f46e5aa..11fbbbb328 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.jms.pool; +import java.util.NoSuchElementException; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -74,6 +75,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti private int maximumActiveSessionPerConnection = 500; private int idleTimeout = 30 * 1000; + private int connectionTimeout = 30 * 1000; private boolean blockIfSessionPoolIsFull = true; private long blockIfSessionPoolIsFullTimeout = -1L; private long expiryTimeout = 0l; @@ -144,6 +146,9 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti }, poolConfig); + // Set max wait time to control borrow from pool. + this.connectionsPool.setMaxWaitMillis(getConnectionTimeout()); + // Set max idle (not max active) since our connections always idle in the pool. this.connectionsPool.setMaxIdlePerKey(1); this.connectionsPool.setLifo(false); @@ -232,16 +237,24 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti // under lock to prevent another thread from triggering an expiration check and // pulling the rug out from under us. while (connection == null) { - connection = connectionsPool.borrowObject(key); - synchronized (connection) { - if (connection.getConnection() != null) { - connection.incrementReferenceCount(); - break; + try { + connection = connectionsPool.borrowObject(key); + } catch (NoSuchElementException ex) { + if (!"Timeout waiting for idle object".equals(ex.getMessage())) { + throw ex; } + } + if (connection != null) { + synchronized (connection) { + if (connection.getConnection() != null) { + connection.incrementReferenceCount(); + break; + } - // Return the bad one to the pool and let if get destroyed as normal. - connectionsPool.returnObject(key, connection); - connection = null; + // Return the bad one to the pool and let if get destroyed as normal. + connectionsPool.returnObject(key, connection); + connection = null; + } } } } catch (Exception e) { @@ -420,6 +433,27 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti this.idleTimeout = idleTimeout; } + /** + * Gets the connection timeout value. The maximum time waited to get a Connection from the pool. + * The default value is 30 seconds. + * + * @return connection timeout value (milliseconds) + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Sets the connection timeout value for getting Connections from this pool in Milliseconds, + * defaults to 30 seconds. + * + * @param connectionTimeout + * The maximum time to wait for getting a pooled Connection. + */ + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + /** * allow connections to expire, irrespective of load or idle time. This is useful with failover * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java index 0be8108a61..a13f3129d8 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java @@ -16,13 +16,6 @@ */ package org.apache.activemq.jms.pool; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -46,6 +39,8 @@ import org.apache.activemq.util.Wait; import org.apache.log4j.Logger; import org.junit.Test; +import static org.junit.Assert.*; + /** * Checks the behavior of the PooledConnectionFactory when the maximum amount of * sessions is being reached. @@ -67,6 +62,23 @@ public class PooledConnectionFactoryTest extends JmsPoolTestSupport { pcf.stop(); } + @Test(timeout = 120000) + public void testConnectionTimeout() throws Exception { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false"); + PooledConnectionFactory cf = new PooledConnectionFactory(); + cf.setConnectionFactory(amq); + cf.setConnectionTimeout(100); + + PooledConnection connection = (PooledConnection) cf.createConnection(); + assertEquals(1, cf.getNumConnections()); + + // wait for the connection timeout + Thread.sleep(300); + + connection = (PooledConnection) cf.createConnection(); + assertEquals(1, cf.getNumConnections()); + } + @Test(timeout = 60000) public void testClearAllConnections() throws Exception {