mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1173648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ad6b5e29db
commit
f0e6f26623
|
@ -18,9 +18,8 @@
|
|||
package org.apache.activemq.pool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
@ -32,13 +31,13 @@ import org.apache.commons.pool.ObjectPoolFactory;
|
|||
|
||||
/**
|
||||
* Holds a real JMS connection along with the session pools associated with it.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ConnectionPool {
|
||||
|
||||
private ActiveMQConnection connection;
|
||||
private Map<SessionKey, SessionPool> cache;
|
||||
private ConcurrentHashMap<SessionKey, SessionPool> cache;
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
private int referenceCount;
|
||||
private ObjectPoolFactory poolFactory;
|
||||
|
@ -50,7 +49,7 @@ public class ConnectionPool {
|
|||
private long expiryTimeout = 0l;
|
||||
|
||||
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
|
||||
this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
|
||||
this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
|
||||
// Add a transport Listener so that we can notice if this connection
|
||||
// should be expired due to
|
||||
// a connection failure.
|
||||
|
@ -69,7 +68,7 @@ public class ConnectionPool {
|
|||
|
||||
public void transportResumed() {
|
||||
}
|
||||
});
|
||||
});
|
||||
//
|
||||
// make sure that we set the hasFailed flag, in case the transport already failed
|
||||
// prior to the addition of our new TransportListener
|
||||
|
@ -79,7 +78,7 @@ public class ConnectionPool {
|
|||
}
|
||||
}
|
||||
|
||||
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
|
||||
public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
|
||||
this.connection = connection;
|
||||
this.cache = cache;
|
||||
this.poolFactory = poolFactory;
|
||||
|
@ -87,12 +86,12 @@ public class ConnectionPool {
|
|||
|
||||
public void start() throws JMSException {
|
||||
if (started.compareAndSet(false, true)) {
|
||||
try {
|
||||
connection.start();
|
||||
} catch (JMSException e) {
|
||||
started.set(false);
|
||||
throw(e);
|
||||
}
|
||||
try {
|
||||
connection.start();
|
||||
} catch (JMSException e) {
|
||||
started.set(false);
|
||||
throw(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,10 +101,21 @@ public class ConnectionPool {
|
|||
|
||||
public Session createSession(boolean transacted, int ackMode) throws JMSException {
|
||||
SessionKey key = new SessionKey(transacted, ackMode);
|
||||
SessionPool pool = cache.get(key);
|
||||
SessionPool pool = null;
|
||||
pool = cache.get(key);
|
||||
if (pool == null) {
|
||||
pool = createSessionPool(key);
|
||||
cache.put(key, pool);
|
||||
SessionPool newPool = createSessionPool(key);
|
||||
SessionPool prevPool = cache.putIfAbsent(key, newPool);
|
||||
if (prevPool != null && prevPool != newPool) {
|
||||
// newPool was not the first one to be associated with this
|
||||
// key... close created session pool
|
||||
try {
|
||||
newPool.close();
|
||||
} catch (Exception e) {
|
||||
throw new JMSException(e.getMessage());
|
||||
}
|
||||
}
|
||||
pool = cache.get(key); // this will return a non-null value...
|
||||
}
|
||||
PooledSession session = pool.borrowSession();
|
||||
return session;
|
||||
|
@ -144,8 +154,8 @@ public class ConnectionPool {
|
|||
lastUsed = System.currentTimeMillis();
|
||||
if (referenceCount == 0) {
|
||||
expiredCheck();
|
||||
|
||||
// only clean up temp destinations when all users
|
||||
|
||||
// only clean up temp destinations when all users
|
||||
// of this connection have called close
|
||||
if (getConnection() != null) {
|
||||
getConnection().cleanUpTempDestinations();
|
||||
|
@ -166,7 +176,7 @@ public class ConnectionPool {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
if (hasFailed
|
||||
if (hasFailed
|
||||
|| (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
|
||||
|| expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
|
||||
hasExpired = true;
|
||||
|
@ -193,7 +203,7 @@ public class ConnectionPool {
|
|||
public void setExpiryTimeout(long expiryTimeout) {
|
||||
this.expiryTimeout = expiryTimeout;
|
||||
}
|
||||
|
||||
|
||||
public long getExpiryTimeout() {
|
||||
return expiryTimeout;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
package org.apache.activemq.pool;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestCase;
|
||||
import junit.framework.TestSuite;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
|
||||
/**
|
||||
* Checks the behavior of the PooledConnectionFactory when the maximum amount
|
||||
* of sessions is being reached (maximumActive).
|
||||
* When using setBlockIfSessionPoolIsFull(true) on the ConnectionFactory,
|
||||
* further requests for sessions should block.
|
||||
* If it does not block, its a bug.
|
||||
*
|
||||
* @author: tmielke
|
||||
*/
|
||||
public class PooledConnectionFactoryMaximumActiveTest extends TestCase
|
||||
{
|
||||
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryMaximumActiveTest.class);
|
||||
public static Connection conn = null;
|
||||
public static int sleepTimeout = 5000;
|
||||
|
||||
private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer,Session>();
|
||||
|
||||
|
||||
/**
|
||||
* Create the test case
|
||||
*
|
||||
* @param testName name of the test case
|
||||
*/
|
||||
public PooledConnectionFactoryMaximumActiveTest( String testName )
|
||||
{
|
||||
super( testName );
|
||||
}
|
||||
|
||||
public static void addSession(Session s) {
|
||||
sessions.put(s.hashCode(), s);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the suite of tests being tested
|
||||
*/
|
||||
public static Test suite()
|
||||
{
|
||||
return new TestSuite( PooledConnectionFactoryMaximumActiveTest.class );
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the behavior of the sessionPool of the PooledConnectionFactory
|
||||
* when maximum number of sessions are reached. This test uses
|
||||
* maximumActive=1.
|
||||
* When creating two threads that both
|
||||
* try to create a JMS session from the same JMS connection,
|
||||
* the thread that is second to call createSession()
|
||||
* should block (as only 1 session is allowed) until the
|
||||
* session is returned to pool.
|
||||
* If it does not block, its a bug.
|
||||
*
|
||||
*/
|
||||
public void testApp() throws Exception
|
||||
{
|
||||
// Initialize JMS connection
|
||||
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
|
||||
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
|
||||
cf.setMaxConnections(3);
|
||||
cf.setMaximumActive(1);
|
||||
cf.setBlockIfSessionPoolIsFull(true);
|
||||
conn = cf.createConnection();
|
||||
|
||||
// start test runner threads. It is expected that the second thread
|
||||
// blocks on the call to createSession()
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||
executor.submit(new TestRunner2());
|
||||
// Thread.sleep(100);
|
||||
Future<Boolean> result2 = (Future<Boolean>) executor.submit(new TestRunner2());
|
||||
|
||||
|
||||
// sleep to allow threads to run
|
||||
Thread.sleep(sleepTimeout);
|
||||
|
||||
// second task should not have finished, instead wait on getting a
|
||||
// JMS Session
|
||||
Assert.assertEquals(false, result2.isDone());
|
||||
|
||||
//Only 1 session should have been created
|
||||
Assert.assertEquals(1, sessions.size());
|
||||
|
||||
// Take all threads down
|
||||
executor.shutdownNow();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
class TestRunner2 implements Callable<Boolean> {
|
||||
|
||||
public final static Logger LOG = Logger.getLogger(TestRunner2.class);
|
||||
|
||||
/**
|
||||
* @return true if test succeeded, false otherwise
|
||||
*/
|
||||
public Boolean call() {
|
||||
|
||||
Session one = null;
|
||||
|
||||
// wait at most 5 seconds for the call to createSession
|
||||
try {
|
||||
|
||||
if (PooledConnectionFactoryMaximumActiveTest.conn == null) {
|
||||
LOG.error("Connection not yet initialized. Aborting test.");
|
||||
return new Boolean(false);
|
||||
}
|
||||
|
||||
one = PooledConnectionFactoryMaximumActiveTest.conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
LOG.info("Created new Session with id" + one);
|
||||
PooledConnectionFactoryMaximumActiveTest.addSession(one);
|
||||
Thread.sleep(2 * PooledConnectionFactoryMaximumActiveTest.sleepTimeout);
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error(ex.getMessage());
|
||||
return new Boolean(false);
|
||||
|
||||
} finally {
|
||||
if (one != null)
|
||||
try {
|
||||
one.close();
|
||||
} catch (JMSException e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// all good, test succeeded
|
||||
return new Boolean(true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue