test still failing on solaris with slow disk, reducing log output and adding a thread dump if it fails to help diagnose: https://issues.apache.org/activemq/browse/AMQ-1936 - need to exclude a hung consumer case

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@799560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-07-31 09:37:32 +00:00
parent e4b76014c6
commit c20430459a

View File

@ -20,6 +20,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -98,7 +101,7 @@ public class AMQ1936Test extends TestCase{
} }
} }
private void sendTextMessage( String queueName, String msg ) throws JMSException, NamingException { private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException {
QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test"); QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
QueueConnection queueConnection = null; QueueConnection queueConnection = null;
QueueSession session = null; QueueSession session = null;
@ -116,7 +119,7 @@ public class AMQ1936Test extends TestCase{
sender = session.createSender( queue ); sender = session.createSender( queue );
sender.setDeliveryMode( DeliveryMode.PERSISTENT ); sender.setDeliveryMode( DeliveryMode.PERSISTENT );
message = session.createTextMessage( msg ); message = session.createTextMessage( String.valueOf(i) );
// send the message // send the message
sender.send( message ); sender.send( message );
@ -124,9 +127,10 @@ public class AMQ1936Test extends TestCase{
if( session.getTransacted()) { if( session.getTransacted()) {
session.commit(); session.commit();
} }
if (i%1000 == 0) {
logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( ) logger.info( "Message successfully sent to : " + queue.getQueueName( ) + " messageid: " + message.getJMSMessageID( )
+ " content:" + message.getText()); + " content:" + message.getText());
}
} finally { } finally {
if( sender!=null ) { if( sender!=null ) {
sender.close(); sender.close();
@ -152,7 +156,7 @@ public class AMQ1936Test extends TestCase{
if( duplicateSignal.getCount()==0 ) { if( duplicateSignal.getCount()==0 ) {
fail( "Duplicate message id detected" ); fail( "Duplicate message id detected" );
} }
sendTextMessage( TEST_QUEUE_NAME, String.valueOf(i) ); sendTextMessage( TEST_QUEUE_NAME, i );
} }
// create a number of consumers to read of the messages and start them with a handler which simply stores the message ids // create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
@ -162,10 +166,10 @@ public class AMQ1936Test extends TestCase{
public void onMessage( Message message ) throws Exception { public void onMessage( Message message ) throws Exception {
synchronized( lock ) { synchronized( lock ) {
int current = messageCount.incrementAndGet();
if (current % 1000 == 0) {
logger.info( "Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage)message).getText() ); logger.info( "Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage)message).getText() );
}
messageCount.incrementAndGet();
if( messages.containsKey( message.getJMSMessageID()) ) { if( messages.containsKey( message.getJMSMessageID()) ) {
duplicateSignal.countDown( ); duplicateSignal.countDown( );
logger.fatal( "duplicate message id detected:" + message.getJMSMessageID() ); logger.fatal( "duplicate message id detected:" + message.getJMSMessageID() );
@ -184,19 +188,35 @@ public class AMQ1936Test extends TestCase{
if( duplicateSignal.getCount()==0) { if( duplicateSignal.getCount()==0) {
fail( "Duplicate message id detected" ); fail( "Duplicate message id detected" );
} }
sendTextMessage( TEST_QUEUE_NAME, String.valueOf( i ) ); sendTextMessage( TEST_QUEUE_NAME, i );
} }
logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
// allow some time for messages to be delivered to receivers. // allow some time for messages to be delivered to receivers.
Wait.waitFor(new Wait.Condition() { boolean ok = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return TEST_MESSAGE_COUNT == messages.size(); return TEST_MESSAGE_COUNT == messages.size();
} }
}); }, 20*60*1000);
if (!ok) {
dumpAllThreads("--STUCK?--");
}
assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) ); assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
assertEquals( TEST_MESSAGE_COUNT, messageCount.get() ); assertEquals( TEST_MESSAGE_COUNT, messageCount.get() );
} }
private void dumpAllThreads(String prefix) {
Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
System.err.println(prefix + stackEntry.getKey());
for(StackTraceElement element : stackEntry.getValue()) {
System.err.println(" " + element);
}
}
}
private final static class ThreadedMessageReceiver implements Runnable { private final static class ThreadedMessageReceiver implements Runnable {
private String queueName = null; private String queueName = null;