git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1163815 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-31 21:07:28 +00:00
parent c6ed5ff237
commit cf914d16af
3 changed files with 184 additions and 1 deletions

View File

@ -31,6 +31,7 @@ import org.apache.activemq.util.IOExceptionSupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.pool.ObjectPoolFactory; import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPoolFactory; import org.apache.commons.pool.impl.GenericObjectPoolFactory;
/** /**
@ -65,6 +66,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
private int maximumActive = 500; private int maximumActive = 500;
private int maxConnections = 1; private int maxConnections = 1;
private int idleTimeout = 30 * 1000; private int idleTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = false;
private AtomicBoolean stopped = new AtomicBoolean(false); private AtomicBoolean stopped = new AtomicBoolean(false);
private long expiryTimeout = 0l; private long expiryTimeout = 0l;
@ -194,6 +196,26 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.maximumActive = maximumActive; this.maximumActive = maximumActive;
} }
/**
* Controls the behavior of the internal session pool.
* By default the call to Connection.getSession() will
* return a JMSException if the session pool is full.
* If the argument true is given, it will change the
* default behavior and instead the call to getSession()
* will block until a session is available in the pool, which
* used to be the default behavior in ActiveMQ versions < 5.6.
*
* The size of the session pool is controlled by the @see #maximumActive
* property.
*
* @param block - if true, the call to getSession() blocks if the pool
* is full until a session object is available.
* defaults to false.
*/
public void setBlockIfSessionPoolIsFull(boolean block) {
this.blockIfSessionPoolIsFull = block;
}
/** /**
* @return the maxConnections * @return the maxConnections
*/ */
@ -208,8 +230,21 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.maxConnections = maxConnections; this.maxConnections = maxConnections;
} }
/**
* Creates an ObjectPoolFactory. Its behavior is controlled by the two
* properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
*
* @return the newly created but empty ObjectPoolFactory
*/
protected ObjectPoolFactory createPoolFactory() { protected ObjectPoolFactory createPoolFactory() {
return new GenericObjectPoolFactory(null, maximumActive); if (blockIfSessionPoolIsFull) {
return new GenericObjectPoolFactory(null, maximumActive);
} else {
return new GenericObjectPoolFactory(null,
maximumActive,
GenericObjectPool.WHEN_EXHAUSTED_FAIL,
GenericObjectPool.DEFAULT_MAX_WAIT);
}
} }
public int getIdleTimeout() { public int getIdleTimeout() {

View File

@ -0,0 +1,147 @@
package org.apache.activemq.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount
* of sessions is being reached.
* Older versions simply block in the call to Connection.getSession(), which isn't good.
* An exception being returned is the better option, so JMS clients don't block.
* This test succeeds if an exception is returned and fails if the call to getSession()
* blocks.
*
*/
public class PooledConnectionFactoryTest extends TestCase
{
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
/**
* Create the test case
*
* @param testName name of the test case
*/
public PooledConnectionFactoryTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( PooledConnectionFactoryTest.class );
}
/**
* Tests the behavior of the sessionPool of the PooledConnectionFactory
* when maximum number of sessions are reached. In older versions the call to
* Connection.createSession() would simply block indefinitely if the maximum
* number of sessions got reached (controled by
* PooledConnectionFactory.setMaximumActive()).
* Rather than blocking the entire thread, it should raise an exception
* instead.
*/
public void testApp() throws Exception
{
// using separate thread for testing so that we can interrupt the test
// if the call to get a new session blocks.
// start test runner thread
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
// test should not take > 5secs, so test fails i
Thread.sleep(5*1000);
if (!result.isDone() || !result.get().booleanValue()) {
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
" is blocking but should have returned an error instead.");
executor.shutdownNow();
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
"limit is exceeded but should return an exception instead.");
}
}
}
class TestRunner implements Callable<Boolean> {
public final static Logger LOG = Logger.getLogger(TestRunner.class);
/**
* @return true if test succeeded, false otherwise
*/
public Boolean call() {
Connection conn = null;
Session one = null;
// wait at most 5 seconds for the call to createSession
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(3);
cf.setMaximumActive(1);
// default should be false already but lets make sure a change to the default
// setting does not make this test fail.
cf.setBlockIfSessionPoolIsFull(false);
conn = cf.createConnection();
one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session two = null;
try {
// this should raise an exception as we called setMaximumActive(1)
two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
two.close();
LOG.error("Expected JMSException wasn't thrown.");
Assert.fail("seconds call to Connection.createSession() was supposed" +
"to raise an JMSException as internal session pool" +
"is exhausted. This did not happen and indiates a problem");
return new Boolean(false);
} catch (JMSException ex) {
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
//expected, ignore but log
LOG.info("Caught expected " + ex);
} else {
LOG.error(ex);
return new Boolean(false);
}
} finally {
if (one != null)
one.close();
if (conn != null)
conn.close();
}
} catch (Exception ex) {
LOG.error(ex.getMessage());
return new Boolean(false);
}
// all good, test succeeded
return new Boolean(true);
}
}

View File

@ -23,6 +23,7 @@ log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.spring=WARN log4j.logger.org.apache.activemq.spring=WARN
#log4j.logger.org.apache.activemq.usecases=DEBUG #log4j.logger.org.apache.activemq.usecases=DEBUG
#log4j.logger.org.apache.activemq.broker.region=DEBUG #log4j.logger.org.apache.activemq.broker.region=DEBUG
log4j.logger.org.apache.activemq.pool=DEBUG
# CONSOLE appender not used by default # CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender