mirror of https://github.com/apache/activemq.git
investigating JDBCSpringTest intermittent failures
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@886751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8f8f9ac529
commit
4caad1a058
|
@ -53,7 +53,7 @@ public class ConsumerThread extends Thread {
|
||||||
|
|
||||||
while (run) {
|
while (run) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ public class ConsumerThread extends Thread {
|
||||||
((SingleConnectionFactory)connectionFactory).destroy();
|
((SingleConnectionFactory)connectionFactory).destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("ConsumerThread1 closing down");
|
log.info("ConsumerThread closing down");
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultMessageListenerContainer createContainer() {
|
private DefaultMessageListenerContainer createContainer() {
|
||||||
|
@ -82,18 +82,6 @@ public class ConsumerThread extends Thread {
|
||||||
container.setConcurrentConsumers(concurrentConsumers);
|
container.setConcurrentConsumers(concurrentConsumers);
|
||||||
container.setSessionTransacted(sessionTransacted);
|
container.setSessionTransacted(sessionTransacted);
|
||||||
|
|
||||||
//container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
|
|
||||||
//container.setMaxConcurrentConsumers(concurrentConsumers);
|
|
||||||
//container.setAcceptMessagesWhileStopping(false);
|
|
||||||
//container.setAutoStartup(false);
|
|
||||||
//without setting a tx manager, this will use local JMS tx.
|
|
||||||
|
|
||||||
/*
|
|
||||||
if (durable) {
|
|
||||||
container.setSubscriptionDurable(true);
|
|
||||||
container.setDurableSubscriptionName("ConsumerThread1" + Thread.currentThread().getId());
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
container.afterPropertiesSet();
|
container.afterPropertiesSet();
|
||||||
log.info("subscribing to " + destination + queueSuffix);
|
log.info("subscribing to " + destination + queueSuffix);
|
||||||
return container;
|
return container;
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class JDBCSpringTest extends TestCase {
|
||||||
thread.setNumberOfQueues(numberOfQueues);
|
thread.setNumberOfQueues(numberOfQueues);
|
||||||
thread.setQueuePrefix("AMQ-2436.queue.");
|
thread.setQueuePrefix("AMQ-2436.queue.");
|
||||||
thread.setConnectionFactory(connectionFactory);
|
thread.setConnectionFactory(connectionFactory);
|
||||||
thread.setSendDelay(100);
|
//thread.setSendDelay(100);
|
||||||
ProducerThreads.add(thread);
|
ProducerThreads.add(thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,13 +106,7 @@ public class JDBCSpringTest extends TestCase {
|
||||||
thread.setConsumerName("consumer" + i);
|
thread.setConsumerName("consumer" + i);
|
||||||
ConsumerThreads.add(thread);
|
ConsumerThreads.add(thread);
|
||||||
thread.start();
|
thread.start();
|
||||||
|
|
||||||
while (!thread.isRunning()) {
|
|
||||||
Thread.sleep(200);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(5000);
|
|
||||||
|
|
||||||
|
|
||||||
for (ProducerThread thread : ProducerThreads) {
|
for (ProducerThread thread : ProducerThreads) {
|
||||||
|
@ -120,12 +114,10 @@ public class JDBCSpringTest extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean finished = false;
|
boolean finished = false;
|
||||||
int retry = 0;
|
|
||||||
int previous = 0;
|
int previous = 0;
|
||||||
while (!finished) {
|
while (!finished) {
|
||||||
|
|
||||||
int totalMessages = 0;
|
int totalMessages = 0;
|
||||||
retry++;
|
|
||||||
for (Thread thread : ConsumerThreads) {
|
for (Thread thread : ConsumerThreads) {
|
||||||
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
|
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
|
||||||
}
|
}
|
||||||
|
@ -134,7 +126,6 @@ public class JDBCSpringTest extends TestCase {
|
||||||
for (Thread thread : ConsumerThreads) {
|
for (Thread thread : ConsumerThreads) {
|
||||||
((ConsumerThread)thread).setRun(false);
|
((ConsumerThread)thread).setRun(false);
|
||||||
}
|
}
|
||||||
Thread.sleep(3000);
|
|
||||||
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
|
fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
|
||||||
}
|
}
|
||||||
previous = totalMessages;
|
previous = totalMessages;
|
||||||
|
@ -151,7 +142,7 @@ public class JDBCSpringTest extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
Thread.sleep(10000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class ProducerThread extends Thread {
|
||||||
private int numberOfTopics;
|
private int numberOfTopics;
|
||||||
private int numberOfMessagesToSend;
|
private int numberOfMessagesToSend;
|
||||||
private int messagesSent;
|
private int messagesSent;
|
||||||
private Random generator = new Random();
|
private Random generator;
|
||||||
private String queuePrefix;
|
private String queuePrefix;
|
||||||
private ConnectionFactory connectionFactory;
|
private ConnectionFactory connectionFactory;
|
||||||
private String message;
|
private String message;
|
||||||
|
@ -45,6 +45,7 @@ public class ProducerThread extends Thread {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
initialize();
|
initialize();
|
||||||
|
Random generator = new Random(Thread.currentThread().getName().hashCode());
|
||||||
|
|
||||||
while (messagesSent < numberOfMessagesToSend) {
|
while (messagesSent < numberOfMessagesToSend) {
|
||||||
int queueSuffix = generator.nextInt(numberOfTopics);
|
int queueSuffix = generator.nextInt(numberOfTopics);
|
||||||
|
|
|
@ -21,6 +21,8 @@
|
||||||
log4j.rootLogger=INFO, out, stdout
|
log4j.rootLogger=INFO, out, stdout
|
||||||
|
|
||||||
log4j.logger.org.apache.activemq.spring=WARN
|
log4j.logger.org.apache.activemq.spring=WARN
|
||||||
|
#log4j.logger.org.apache.activemq.usecases=DEBUG
|
||||||
|
#log4j.logger.org.apache.activemq.broker.region=DEBUG
|
||||||
|
|
||||||
# CONSOLE appender not used by default
|
# CONSOLE appender not used by default
|
||||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
|
Loading…
Reference in New Issue