Cleans up test case and adds logging to help figure out what's failing. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-30 21:12:59 +00:00
parent 1e86ac0508
commit d2c901f0ca
1 changed files with 37 additions and 19 deletions

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener; import javax.jms.ExceptionListener;
@ -37,27 +38,27 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
@SuppressWarnings("unchecked")
public class NioQueueSubscriptionTest extends QueueSubscriptionTest implements ExceptionListener {
protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class); protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class);
private Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>()); private final Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>());
@Override @Override
protected ConnectionFactory createConnectionFactory() throws Exception { protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false"); return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
//setMaxTestTime(20*60*1000); // setMaxTestTime(20*60*1000);
super.setUp(); super.setUp();
} }
@Override @Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService answer = BrokerFactory.createBroker(new URI("broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0")); BrokerService answer = BrokerFactory.createBroker(new URI(
"broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
answer.getManagementContext().setCreateConnector(false); answer.getManagementContext().setCreateConnector(false);
answer.setUseJmx(false); answer.setUseJmx(false);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
@ -72,40 +73,57 @@ public class NioQueueSubscriptionTest extends QueueSubscriptionTest implements E
answer.setDestinationPolicy(policyMap); answer.setDestinationPolicy(policyMap);
return answer; return answer;
} }
public void testLotsOfConcurrentConnections() throws Exception { public void testLotsOfConcurrentConnections() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newCachedThreadPool();
final ConnectionFactory factory = createConnectionFactory(); final ConnectionFactory factory = createConnectionFactory();
final ExceptionListener listener = this;
int connectionCount = 400; int connectionCount = 400;
for (int i=0;i<connectionCount ;i++) { final AtomicInteger threadId = new AtomicInteger(0);
for (int i = 0; i < connectionCount; i++) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
final int innerId = threadId.incrementAndGet();
try { try {
ExceptionListener listener = new NioQueueSubscriptionTestListener(innerId, exceptions, LOG);
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.setExceptionListener(listener); connection.setExceptionListener(listener);
connection.start(); connection.start();
assertNotNull(connection.getBrokerName()); assertNotNull(connection.getBrokerName());
connections.add(connection); connections.add(connection);
} catch (Exception e) { } catch (Exception e) {
LOG.error(">>>> Exception in run() on thread " + innerId, e);
exceptions.put(Thread.currentThread(), e); exceptions.put(Thread.currentThread(), e);
} }
} }
}); });
} }
executor.shutdown(); executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS); executor.awaitTermination(30, TimeUnit.SECONDS);
if (!exceptions.isEmpty()) { if (!exceptions.isEmpty()) {
LOG.error("" + exceptions.size() + " exceptions like", exceptions.values().iterator().next()); LOG.error(">>>> " + exceptions.size() + " exceptions like", exceptions.values().iterator().next());
fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next()); fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next());
} }
LOG.info("created " + connectionCount + " connecitons"); LOG.info("created " + connectionCount + " connections");
}
}
class NioQueueSubscriptionTestListener implements ExceptionListener {
private int id = 0;
protected Logger LOG;
private final Map<Thread, Throwable> exceptions;
public NioQueueSubscriptionTestListener(int id, Map<Thread, Throwable> exceptions, Logger log) {
this.id = id;
this.exceptions = exceptions;
this.LOG = log;
} }
@Override
public void onException(JMSException exception) { public void onException(JMSException exception) {
LOG.error("Exception on conneciton", exception); LOG.error(">>>> Exception in onException() on thread " + id, exception);
exceptions.put(Thread.currentThread(), exception); exceptions.put(Thread.currentThread(), exception);
} }
} }