mirror of https://github.com/apache/activemq.git
apply fix for: https://issues.apache.org/jira/browse/AMQ-3482 to return default behavior to blocking.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1164058 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
588a3c3594
commit
5ac1540c63
|
@ -40,23 +40,23 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory;
|
|||
* href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
|
||||
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
|
||||
* without having to undergo the cost of creating them again.
|
||||
*
|
||||
*
|
||||
* b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
|
||||
* it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
|
||||
* it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
|
||||
* are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
|
||||
* just created at startup and left active, handling incoming messages as they come. When a consumer is
|
||||
* complete, it is best to close it rather than return it to a pool for later reuse: this is because,
|
||||
* complete, it is best to close it rather than return it to a pool for later reuse: this is because,
|
||||
* even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
|
||||
* where they'll get held until the consumer is active again.
|
||||
*
|
||||
*
|
||||
* If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
|
||||
* might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
|
||||
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
|
||||
* might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
|
||||
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
|
||||
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
|
||||
*
|
||||
*
|
||||
* @org.apache.xbean.XBean element="pooledConnectionFactory"
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class PooledConnectionFactory implements ConnectionFactory, Service {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
|
||||
|
@ -66,7 +66,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
|||
private int maximumActive = 500;
|
||||
private int maxConnections = 1;
|
||||
private int idleTimeout = 30 * 1000;
|
||||
private boolean blockIfSessionPoolIsFull = false;
|
||||
private boolean blockIfSessionPoolIsFull = true;
|
||||
private AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
private long expiryTimeout = 0l;
|
||||
|
||||
|
@ -99,7 +99,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
|||
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
ConnectionKey key = new ConnectionKey(userName, password);
|
||||
LinkedList<ConnectionPool> pools = cache.get(key);
|
||||
|
||||
|
@ -195,25 +195,21 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
|||
public void setMaximumActive(int maximumActive) {
|
||||
this.maximumActive = maximumActive;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Controls the behavior of the internal session pool.
|
||||
* By default the call to Connection.getSession() will
|
||||
* return a JMSException if the session pool is full.
|
||||
* If the argument true is given, it will change the
|
||||
* default behavior and instead the call to getSession()
|
||||
* will block until a session is available in the pool, which
|
||||
* used to be the default behavior in ActiveMQ versions < 5.6.
|
||||
*
|
||||
* The size of the session pool is controlled by the @see #maximumActive
|
||||
* Controls the behavior of the internal session pool. By default the call to
|
||||
* Connection.getSession() will block if the session pool is full. If the
|
||||
* argument false is given, it will change the default behavior and instead the
|
||||
* call to getSession() will throw a JMSException.
|
||||
*
|
||||
* The size of the session pool is controlled by the @see #maximumActive
|
||||
* property.
|
||||
*
|
||||
* @param block - if true, the call to getSession() blocks if the pool
|
||||
* is full until a session object is available.
|
||||
* defaults to false.
|
||||
*
|
||||
* @param block - if true, the call to getSession() blocks if the pool is full
|
||||
* until a session object is available. defaults to true.
|
||||
*/
|
||||
public void setBlockIfSessionPoolIsFull(boolean block) {
|
||||
this.blockIfSessionPoolIsFull = block;
|
||||
this.blockIfSessionPoolIsFull = block;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -233,18 +229,18 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
|||
/**
|
||||
* Creates an ObjectPoolFactory. Its behavior is controlled by the two
|
||||
* properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
|
||||
*
|
||||
*
|
||||
* @return the newly created but empty ObjectPoolFactory
|
||||
*/
|
||||
protected ObjectPoolFactory createPoolFactory() {
|
||||
if (blockIfSessionPoolIsFull) {
|
||||
return new GenericObjectPoolFactory(null, maximumActive);
|
||||
} else {
|
||||
return new GenericObjectPoolFactory(null,
|
||||
maximumActive,
|
||||
GenericObjectPool.WHEN_EXHAUSTED_FAIL,
|
||||
GenericObjectPool.DEFAULT_MAX_WAIT);
|
||||
}
|
||||
if (blockIfSessionPoolIsFull) {
|
||||
return new GenericObjectPoolFactory(null, maximumActive);
|
||||
} else {
|
||||
return new GenericObjectPoolFactory(null,
|
||||
maximumActive,
|
||||
GenericObjectPool.WHEN_EXHAUSTED_FAIL,
|
||||
GenericObjectPool.DEFAULT_MAX_WAIT);
|
||||
}
|
||||
}
|
||||
|
||||
public int getIdleTimeout() {
|
||||
|
@ -258,13 +254,13 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
|||
/**
|
||||
* allow connections to expire, irrespective of load or idle time. This is useful with failover
|
||||
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
|
||||
*
|
||||
*
|
||||
* @param expiryTimeout non zero in milliseconds
|
||||
*/
|
||||
public void setExpiryTimeout(long expiryTimeout) {
|
||||
this.expiryTimeout = expiryTimeout;
|
||||
this.expiryTimeout = expiryTimeout;
|
||||
}
|
||||
|
||||
|
||||
public long getExpiryTimeout() {
|
||||
return expiryTimeout;
|
||||
}
|
||||
|
|
|
@ -22,18 +22,18 @@ import org.apache.log4j.Logger;
|
|||
|
||||
/**
|
||||
* Checks the behavior of the PooledConnectionFactory when the maximum amount
|
||||
* of sessions is being reached.
|
||||
* of sessions is being reached.
|
||||
* Older versions simply block in the call to Connection.getSession(), which isn't good.
|
||||
* An exception being returned is the better option, so JMS clients don't block.
|
||||
* This test succeeds if an exception is returned and fails if the call to getSession()
|
||||
* This test succeeds if an exception is returned and fails if the call to getSession()
|
||||
* blocks.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class PooledConnectionFactoryTest extends TestCase
|
||||
{
|
||||
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
|
||||
|
||||
|
||||
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
|
||||
|
||||
|
||||
/**
|
||||
* Create the test case
|
||||
*
|
||||
|
@ -53,95 +53,87 @@ public class PooledConnectionFactoryTest extends TestCase
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests the behavior of the sessionPool of the PooledConnectionFactory
|
||||
* when maximum number of sessions are reached. In older versions the call to
|
||||
* Connection.createSession() would simply block indefinitely if the maximum
|
||||
* number of sessions got reached (controled by
|
||||
* PooledConnectionFactory.setMaximumActive()).
|
||||
* Rather than blocking the entire thread, it should raise an exception
|
||||
* instead.
|
||||
* Tests the behavior of the sessionPool of the PooledConnectionFactory
|
||||
* when maximum number of sessions are reached.
|
||||
*/
|
||||
public void testApp() throws Exception
|
||||
{
|
||||
// using separate thread for testing so that we can interrupt the test
|
||||
// if the call to get a new session blocks.
|
||||
|
||||
// start test runner thread
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
|
||||
|
||||
// test should not take > 5secs, so test fails i
|
||||
Thread.sleep(5*1000);
|
||||
|
||||
if (!result.isDone() || !result.get().booleanValue()) {
|
||||
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
|
||||
" is blocking but should have returned an error instead.");
|
||||
{
|
||||
// using separate thread for testing so that we can interrupt the test
|
||||
// if the call to get a new session blocks.
|
||||
|
||||
executor.shutdownNow();
|
||||
// start test runner thread
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
|
||||
|
||||
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
|
||||
"limit is exceeded but should return an exception instead.");
|
||||
}
|
||||
// test should not take > 5secs, so test fails i
|
||||
Thread.sleep(5*1000);
|
||||
|
||||
if (!result.isDone() || !result.get().booleanValue()) {
|
||||
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
|
||||
" is blocking but should have returned an error instead.");
|
||||
|
||||
executor.shutdownNow();
|
||||
|
||||
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
|
||||
"limit is exceeded but should return an exception instead.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestRunner implements Callable<Boolean> {
|
||||
|
||||
public final static Logger LOG = Logger.getLogger(TestRunner.class);
|
||||
|
||||
/**
|
||||
* @return true if test succeeded, false otherwise
|
||||
*/
|
||||
public Boolean call() {
|
||||
|
||||
Connection conn = null;
|
||||
Session one = null;
|
||||
|
||||
// wait at most 5 seconds for the call to createSession
|
||||
try {
|
||||
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
|
||||
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
|
||||
cf.setMaxConnections(3);
|
||||
cf.setMaximumActive(1);
|
||||
|
||||
// default should be false already but lets make sure a change to the default
|
||||
// setting does not make this test fail.
|
||||
cf.setBlockIfSessionPoolIsFull(false);
|
||||
|
||||
conn = cf.createConnection();
|
||||
one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Session two = null;
|
||||
try {
|
||||
// this should raise an exception as we called setMaximumActive(1)
|
||||
two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
two.close();
|
||||
|
||||
LOG.error("Expected JMSException wasn't thrown.");
|
||||
Assert.fail("seconds call to Connection.createSession() was supposed" +
|
||||
"to raise an JMSException as internal session pool" +
|
||||
"is exhausted. This did not happen and indiates a problem");
|
||||
return new Boolean(false);
|
||||
} catch (JMSException ex) {
|
||||
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
|
||||
//expected, ignore but log
|
||||
LOG.info("Caught expected " + ex);
|
||||
} else {
|
||||
LOG.error(ex);
|
||||
return new Boolean(false);
|
||||
}
|
||||
} finally {
|
||||
if (one != null)
|
||||
one.close();
|
||||
if (conn != null)
|
||||
conn.close();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error(ex.getMessage());
|
||||
return new Boolean(false);
|
||||
}
|
||||
|
||||
// all good, test succeeded
|
||||
return new Boolean(true);
|
||||
}
|
||||
|
||||
public final static Logger LOG = Logger.getLogger(TestRunner.class);
|
||||
|
||||
/**
|
||||
* @return true if test succeeded, false otherwise
|
||||
*/
|
||||
public Boolean call() {
|
||||
|
||||
Connection conn = null;
|
||||
Session one = null;
|
||||
|
||||
// wait at most 5 seconds for the call to createSession
|
||||
try {
|
||||
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
|
||||
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
|
||||
cf.setMaxConnections(3);
|
||||
cf.setMaximumActive(1);
|
||||
cf.setBlockIfSessionPoolIsFull(false);
|
||||
|
||||
conn = cf.createConnection();
|
||||
one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Session two = null;
|
||||
try {
|
||||
// this should raise an exception as we called setMaximumActive(1)
|
||||
two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
two.close();
|
||||
|
||||
LOG.error("Expected JMSException wasn't thrown.");
|
||||
Assert.fail("seconds call to Connection.createSession() was supposed" +
|
||||
"to raise an JMSException as internal session pool" +
|
||||
"is exhausted. This did not happen and indiates a problem");
|
||||
return new Boolean(false);
|
||||
} catch (JMSException ex) {
|
||||
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
|
||||
//expected, ignore but log
|
||||
LOG.info("Caught expected " + ex);
|
||||
} else {
|
||||
LOG.error(ex);
|
||||
return new Boolean(false);
|
||||
}
|
||||
} finally {
|
||||
if (one != null)
|
||||
one.close();
|
||||
if (conn != null)
|
||||
conn.close();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error(ex.getMessage());
|
||||
return new Boolean(false);
|
||||
}
|
||||
|
||||
// all good, test succeeded
|
||||
return new Boolean(true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue