Give this test more time to meet its criteria

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1155385 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-08-09 14:33:28 +00:00
parent 1e1b568784
commit fd6be7f340
1 changed files with 42 additions and 42 deletions

View File

@ -62,13 +62,13 @@ public class AMQ1936Test extends TestCase{
private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[ CONSUMER_COUNT ]; private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[ CONSUMER_COUNT ];
private BrokerService broker = null; private BrokerService broker = null;
static QueueConnectionFactory connectionFactory = null; static QueueConnectionFactory connectionFactory = null;
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
broker = new BrokerService(); broker = new BrokerService();
broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024); broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
broker.setBrokerName("test"); broker.setBrokerName("test");
@ -80,13 +80,13 @@ public class AMQ1936Test extends TestCase{
@Override @Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
if( threadPool!=null ) { if( threadPool!=null ) {
// signal receivers to stop // signal receivers to stop
for( ThreadedMessageReceiver receiver: receivers) { for( ThreadedMessageReceiver receiver: receivers) {
receiver.setShouldStop( true ); receiver.setShouldStop( true );
} }
logger.info("Waiting for receivers to shutdown.."); logger.info("Waiting for receivers to shutdown..");
if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) { if( ! threadPool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
logger.warn("Not all receivers completed shutdown."); logger.warn("Not all receivers completed shutdown.");
@ -94,14 +94,14 @@ public class AMQ1936Test extends TestCase{
logger.info("All receivers shutdown successfully.."); logger.info("All receivers shutdown successfully..");
} }
} }
logger.debug("Stoping the broker."); logger.debug("Stoping the broker.");
if( broker!=null ) { if( broker!=null ) {
broker.stop(); broker.stop();
} }
} }
private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException { private void sendTextMessage( String queueName, int i ) throws JMSException, NamingException {
QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test"); QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
QueueConnection queueConnection = null; QueueConnection queueConnection = null;
@ -109,22 +109,22 @@ public class AMQ1936Test extends TestCase{
QueueSender sender = null; QueueSender sender = null;
Queue queue = null; Queue queue = null;
TextMessage message = null; TextMessage message = null;
try { try {
// Create the queue connection // Create the queue connection
queueConnection = connectionFactory.createQueueConnection(); queueConnection = connectionFactory.createQueueConnection();
session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE ); session = queueConnection.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
queue = session.createQueue(TEST_QUEUE_NAME); queue = session.createQueue(TEST_QUEUE_NAME);
sender = session.createSender( queue ); sender = session.createSender( queue );
sender.setDeliveryMode( DeliveryMode.PERSISTENT ); sender.setDeliveryMode( DeliveryMode.PERSISTENT );
message = session.createTextMessage( String.valueOf(i) ); message = session.createTextMessage( String.valueOf(i) );
// send the message // send the message
sender.send( message ); sender.send( message );
if( session.getTransacted()) { if( session.getTransacted()) {
session.commit(); session.commit();
} }
@ -144,14 +144,14 @@ public class AMQ1936Test extends TestCase{
} }
} }
} }
public void testForDuplicateMessages( ) throws Exception { public void testForDuplicateMessages( ) throws Exception {
final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap<String, String>( ); final ConcurrentHashMap<String,String> messages = new ConcurrentHashMap<String, String>( );
final Object lock = new Object( ); final Object lock = new Object( );
final CountDownLatch duplicateSignal = new CountDownLatch( 1 ); final CountDownLatch duplicateSignal = new CountDownLatch( 1 );
final AtomicInteger messageCount = new AtomicInteger( 0 ); final AtomicInteger messageCount = new AtomicInteger( 0 );
// add 1/2 the number of our total messages // add 1/2 the number of our total messages
for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) { for( int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
if( duplicateSignal.getCount()==0 ) { if( duplicateSignal.getCount()==0 ) {
@ -159,12 +159,12 @@ public class AMQ1936Test extends TestCase{
} }
sendTextMessage( TEST_QUEUE_NAME, i ); sendTextMessage( TEST_QUEUE_NAME, i );
} }
// create a number of consumers to read of the messages and start them with a handler which simply stores the message ids // create a number of consumers to read of the messages and start them with a handler which simply stores the message ids
// in a Map and checks for a duplicate // in a Map and checks for a duplicate
for( int i = 0; i < CONSUMER_COUNT; i++ ) { for( int i = 0; i < CONSUMER_COUNT; i++ ) {
receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) { receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler( ) {
public void onMessage( Message message ) throws Exception { public void onMessage( Message message ) throws Exception {
synchronized( lock ) { synchronized( lock ) {
int current = messageCount.incrementAndGet(); int current = messageCount.incrementAndGet();
@ -183,7 +183,7 @@ public class AMQ1936Test extends TestCase{
}); });
threadPool.submit( receivers[i]); threadPool.submit( receivers[i]);
} }
// starting adding the remaining messages // starting adding the remaining messages
for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) { for(int i = 0; i < TEST_MESSAGE_COUNT/2; i++ ) {
if( duplicateSignal.getCount()==0) { if( duplicateSignal.getCount()==0) {
@ -191,38 +191,38 @@ public class AMQ1936Test extends TestCase{
} }
sendTextMessage( TEST_QUEUE_NAME, i ); sendTextMessage( TEST_QUEUE_NAME, i );
} }
logger.info("sent all " + TEST_MESSAGE_COUNT + " messages"); logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
// allow some time for messages to be delivered to receivers. // allow some time for messages to be delivered to receivers.
boolean ok = Wait.waitFor(new Wait.Condition() { boolean ok = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return TEST_MESSAGE_COUNT == messages.size(); return TEST_MESSAGE_COUNT == messages.size();
} }
}, 1*60*1000); }, TimeUnit.MINUTES.toMillis(7));
if (!ok) { if (!ok) {
AutoFailTestSupport.dumpAllThreads("--STUCK?--"); AutoFailTestSupport.dumpAllThreads("--STUCK?--");
} }
assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) ); assertEquals( "Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size( ) );
assertEquals( TEST_MESSAGE_COUNT, messageCount.get() ); assertEquals( TEST_MESSAGE_COUNT, messageCount.get() );
} }
private final static class ThreadedMessageReceiver implements Runnable { private final static class ThreadedMessageReceiver implements Runnable {
private String queueName = null; private String queueName = null;
private IMessageHandler handler = null; private IMessageHandler handler = null;
private AtomicBoolean shouldStop = new AtomicBoolean( false ); private AtomicBoolean shouldStop = new AtomicBoolean( false );
public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) { public ThreadedMessageReceiver(String queueName, IMessageHandler handler ) {
this.queueName = queueName; this.queueName = queueName;
this.handler = handler; this.handler = handler;
} }
public void run( ) { public void run( ) {
QueueConnection queueConnection = null; QueueConnection queueConnection = null;
QueueSession session = null; QueueSession session = null;
QueueReceiver receiver = null; QueueReceiver receiver = null;
@ -230,7 +230,7 @@ public class AMQ1936Test extends TestCase{
Message message = null; Message message = null;
try { try {
try { try {
queueConnection = connectionFactory.createQueueConnection( ); queueConnection = connectionFactory.createQueueConnection( );
// create a transacted session // create a transacted session
session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE ); session = queueConnection.createQueueSession( TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE );
@ -239,9 +239,9 @@ public class AMQ1936Test extends TestCase{
// start the connection // start the connection
queueConnection.start( ); queueConnection.start( );
logger.info( "Receiver " + Thread.currentThread().getName() + " connected." ); logger.info( "Receiver " + Thread.currentThread().getName() + " connected." );
// start receive loop // start receive loop
while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) { while( ! ( shouldStop.get() || Thread.currentThread().isInterrupted()) ) {
try { try {
@ -256,32 +256,32 @@ public class AMQ1936Test extends TestCase{
throw e; throw e;
} }
} }
if( message!=null && this.handler!=null ) { if( message!=null && this.handler!=null ) {
this.handler.onMessage(message); this.handler.onMessage(message);
} }
// commit session on successful handling of message // commit session on successful handling of message
if( session.getTransacted()) { if( session.getTransacted()) {
session.commit(); session.commit();
} }
} }
logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." ); logger.info( "Receiver " + Thread.currentThread().getName() + " shutting down." );
} finally { } finally {
if( receiver!=null ) { if( receiver!=null ) {
try { try {
receiver.close(); receiver.close();
} catch (JMSException e) { } catch (JMSException e) {
logger.warn(e); logger.warn(e);
} }
} }
if( session!=null ) { if( session!=null ) {
try { try {
session.close(); session.close();
} catch (JMSException e) { } catch (JMSException e) {
logger.warn(e); logger.warn(e);
} }
} }
if( queueConnection!=null ) { if( queueConnection!=null ) {
@ -307,10 +307,10 @@ public class AMQ1936Test extends TestCase{
this.shouldStop.set(shouldStop); this.shouldStop.set(shouldStop);
} }
} }
public interface IMessageHandler { public interface IMessageHandler {
void onMessage( Message message ) throws Exception; void onMessage( Message message ) throws Exception;
} }
} }