Add a couple tests for concurrent create an unique connections being

created.
This commit is contained in:
Timothy Bish 2014-06-04 12:19:34 -04:00
parent c2cf78542f
commit be0311bea0
1 changed files with 54 additions and 0 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.jms.pool; package org.apache.activemq.jms.pool;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -31,7 +32,9 @@ import junit.framework.Test;
import junit.framework.TestCase; import junit.framework.TestCase;
import junit.framework.TestSuite; import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -193,6 +196,57 @@ public class PooledConnectionFactoryTest extends TestCase {
} }
connections.clear(); connections.clear();
cf.stop();
}
public void testConcurrentCreateGetsUniqueConnectionCreateOnDemand() throws Exception {
doTestConcurrentCreateGetsUniqueConnection(false);
}
public void testConcurrentCreateGetsUniqueConnectionCreateOnStart() throws Exception {
doTestConcurrentCreateGetsUniqueConnection(true);
}
private void doTestConcurrentCreateGetsUniqueConnection(boolean createOnStart) throws Exception {
final int numConnections = 50;
final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
final PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(numConnections);
cf.setCreateConnectionOnStartup(createOnStart);
final ConcurrentHashMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<ConnectionId, Connection>();
final ExecutorService executor = Executors.newFixedThreadPool(numConnections / 2);
for (int i = 0; i < numConnections; ++i) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
PooledConnection pooled = (PooledConnection) cf.createConnection();
ActiveMQConnection amq = (ActiveMQConnection) pooled.getConnection();
connections.put(amq.getConnectionInfo().getConnectionId(), pooled);
} catch (JMSException e) {
}
}
});
}
assertTrue("Should have all unique connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connections.size() == numConnections;
}
}));
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
connections.clear();
cf.stop();
} }
/** /**