diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java index 61ab6b4a68..bea94daa9b 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java @@ -40,23 +40,23 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory; * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer. * Connections, sessions and producers are returned to a pool after use so that they can be reused later * without having to undergo the cost of creating them again. - * + * * b>NOTE: while this implementation does allow the creation of a collection of active consumers, - * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which + * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually * just created at startup and left active, handling incoming messages as they come. When a consumer is - * complete, it is best to close it rather than return it to a pool for later reuse: this is because, + * complete, it is best to close it rather than return it to a pool for later reuse: this is because, * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, * where they'll get held until the consumer is active again. - * + * * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you - * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that - * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: + * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that + * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html - * + * * @org.apache.xbean.XBean element="pooledConnectionFactory" - * - * + * + * */ public class PooledConnectionFactory implements ConnectionFactory, Service { private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); @@ -66,7 +66,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { private int maximumActive = 500; private int maxConnections = 1; private int idleTimeout = 30 * 1000; - private boolean blockIfSessionPoolIsFull = false; + private boolean blockIfSessionPoolIsFull = true; private AtomicBoolean stopped = new AtomicBoolean(false); private long expiryTimeout = 0l; @@ -99,7 +99,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); return null; } - + ConnectionKey key = new ConnectionKey(userName, password); LinkedList pools = cache.get(key); @@ -195,25 +195,21 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { public void setMaximumActive(int maximumActive) { this.maximumActive = maximumActive; } - + /** - * Controls the behavior of the internal session pool. - * By default the call to Connection.getSession() will - * return a JMSException if the session pool is full. - * If the argument true is given, it will change the - * default behavior and instead the call to getSession() - * will block until a session is available in the pool, which - * used to be the default behavior in ActiveMQ versions < 5.6. - * - * The size of the session pool is controlled by the @see #maximumActive + * Controls the behavior of the internal session pool. By default the call to + * Connection.getSession() will block if the session pool is full. If the + * argument false is given, it will change the default behavior and instead the + * call to getSession() will throw a JMSException. + * + * The size of the session pool is controlled by the @see #maximumActive * property. - * - * @param block - if true, the call to getSession() blocks if the pool - * is full until a session object is available. - * defaults to false. + * + * @param block - if true, the call to getSession() blocks if the pool is full + * until a session object is available. defaults to true. */ public void setBlockIfSessionPoolIsFull(boolean block) { - this.blockIfSessionPoolIsFull = block; + this.blockIfSessionPoolIsFull = block; } /** @@ -233,18 +229,18 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { /** * Creates an ObjectPoolFactory. Its behavior is controlled by the two * properties @see #maximumActive and @see #blockIfSessionPoolIsFull. - * + * * @return the newly created but empty ObjectPoolFactory */ protected ObjectPoolFactory createPoolFactory() { - if (blockIfSessionPoolIsFull) { - return new GenericObjectPoolFactory(null, maximumActive); - } else { - return new GenericObjectPoolFactory(null, - maximumActive, - GenericObjectPool.WHEN_EXHAUSTED_FAIL, - GenericObjectPool.DEFAULT_MAX_WAIT); - } + if (blockIfSessionPoolIsFull) { + return new GenericObjectPoolFactory(null, maximumActive); + } else { + return new GenericObjectPoolFactory(null, + maximumActive, + GenericObjectPool.WHEN_EXHAUSTED_FAIL, + GenericObjectPool.DEFAULT_MAX_WAIT); + } } public int getIdleTimeout() { @@ -258,13 +254,13 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { /** * allow connections to expire, irrespective of load or idle time. This is useful with failover * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery - * + * * @param expiryTimeout non zero in milliseconds */ public void setExpiryTimeout(long expiryTimeout) { - this.expiryTimeout = expiryTimeout; + this.expiryTimeout = expiryTimeout; } - + public long getExpiryTimeout() { return expiryTimeout; } diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java index 3fa744c092..6853d0d595 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java @@ -22,18 +22,18 @@ import org.apache.log4j.Logger; /** * Checks the behavior of the PooledConnectionFactory when the maximum amount - * of sessions is being reached. + * of sessions is being reached. * Older versions simply block in the call to Connection.getSession(), which isn't good. * An exception being returned is the better option, so JMS clients don't block. - * This test succeeds if an exception is returned and fails if the call to getSession() + * This test succeeds if an exception is returned and fails if the call to getSession() * blocks. - * + * */ public class PooledConnectionFactoryTest extends TestCase { - public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class); - - + public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class); + + /** * Create the test case * @@ -53,95 +53,87 @@ public class PooledConnectionFactoryTest extends TestCase } /** - * Tests the behavior of the sessionPool of the PooledConnectionFactory - * when maximum number of sessions are reached. In older versions the call to - * Connection.createSession() would simply block indefinitely if the maximum - * number of sessions got reached (controled by - * PooledConnectionFactory.setMaximumActive()). - * Rather than blocking the entire thread, it should raise an exception - * instead. + * Tests the behavior of the sessionPool of the PooledConnectionFactory + * when maximum number of sessions are reached. */ public void testApp() throws Exception - { - // using separate thread for testing so that we can interrupt the test - // if the call to get a new session blocks. - - // start test runner thread - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future result = (Future) executor.submit(new TestRunner()); - - // test should not take > 5secs, so test fails i - Thread.sleep(5*1000); - - if (!result.isDone() || !result.get().booleanValue()) { - PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" + - " is blocking but should have returned an error instead."); + { + // using separate thread for testing so that we can interrupt the test + // if the call to get a new session blocks. - executor.shutdownNow(); + // start test runner thread + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future result = (Future) executor.submit(new TestRunner()); - Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " + - "limit is exceeded but should return an exception instead."); - } + // test should not take > 5secs, so test fails i + Thread.sleep(5*1000); + + if (!result.isDone() || !result.get().booleanValue()) { + PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" + + " is blocking but should have returned an error instead."); + + executor.shutdownNow(); + + Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " + + "limit is exceeded but should return an exception instead."); + } } } class TestRunner implements Callable { - - public final static Logger LOG = Logger.getLogger(TestRunner.class); - - /** - * @return true if test succeeded, false otherwise - */ - public Boolean call() { - - Connection conn = null; - Session one = null; - - // wait at most 5 seconds for the call to createSession - try { - ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false"); - PooledConnectionFactory cf = new PooledConnectionFactory(amq); - cf.setMaxConnections(3); - cf.setMaximumActive(1); - - // default should be false already but lets make sure a change to the default - // setting does not make this test fail. - cf.setBlockIfSessionPoolIsFull(false); - - conn = cf.createConnection(); - one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Session two = null; - try { - // this should raise an exception as we called setMaximumActive(1) - two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - two.close(); - - LOG.error("Expected JMSException wasn't thrown."); - Assert.fail("seconds call to Connection.createSession() was supposed" + - "to raise an JMSException as internal session pool" + - "is exhausted. This did not happen and indiates a problem"); - return new Boolean(false); - } catch (JMSException ex) { - if (ex.getCause().getClass() == java.util.NoSuchElementException.class) { - //expected, ignore but log - LOG.info("Caught expected " + ex); - } else { - LOG.error(ex); - return new Boolean(false); - } - } finally { - if (one != null) - one.close(); - if (conn != null) - conn.close(); - } - } catch (Exception ex) { - LOG.error(ex.getMessage()); - return new Boolean(false); - } - - // all good, test succeeded - return new Boolean(true); - } + + public final static Logger LOG = Logger.getLogger(TestRunner.class); + + /** + * @return true if test succeeded, false otherwise + */ + public Boolean call() { + + Connection conn = null; + Session one = null; + + // wait at most 5 seconds for the call to createSession + try { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false"); + PooledConnectionFactory cf = new PooledConnectionFactory(amq); + cf.setMaxConnections(3); + cf.setMaximumActive(1); + cf.setBlockIfSessionPoolIsFull(false); + + conn = cf.createConnection(); + one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session two = null; + try { + // this should raise an exception as we called setMaximumActive(1) + two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + two.close(); + + LOG.error("Expected JMSException wasn't thrown."); + Assert.fail("seconds call to Connection.createSession() was supposed" + + "to raise an JMSException as internal session pool" + + "is exhausted. This did not happen and indiates a problem"); + return new Boolean(false); + } catch (JMSException ex) { + if (ex.getCause().getClass() == java.util.NoSuchElementException.class) { + //expected, ignore but log + LOG.info("Caught expected " + ex); + } else { + LOG.error(ex); + return new Boolean(false); + } + } finally { + if (one != null) + one.close(); + if (conn != null) + conn.close(); + } + } catch (Exception ex) { + LOG.error(ex.getMessage()); + return new Boolean(false); + } + + // all good, test succeeded + return new Boolean(true); + } }