Fixed synchronizations so that threads don't block each others processing as much and now the test works fine without hanging. see https://issues.apache.org/activemq/browse/AMQ-1251

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@573342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-09-06 19:25:46 +00:00
parent 642d38cd8c
commit 1301501a0a
1 changed files with 35 additions and 37 deletions

View File

@ -18,26 +18,32 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import junit.framework.TestCase;
/** /**
* Test case demonstrating situation where messages are not delivered to consumers. * Test case demonstrating situation where messages are not delivered to consumers.
*/ */
public class QueueWorkerPrefetchTest extends TestCase implements MessageListener public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
{ {
private static final long WAIT_TIMEOUT = 1000*10;
/** The connection URL. */ /** The connection URL. */
private static final String CONNECTION_URL = "tcp://localhost:61616"; private static final String CONNECTION_URL = "tcp://localhost:61616";
@ -57,10 +63,9 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
private MessageConsumer masterItemConsumer; private MessageConsumer masterItemConsumer;
/** The number of acks received by the master. */ /** The number of acks received by the master. */
private long acksReceived; private AtomicLong acksReceived = new AtomicLong(0);
/** The expected number of acks the master should receive. */ private AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
private long expectedCount;
/** Messages sent to the work-item queue. */ /** Messages sent to the work-item queue. */
private static class WorkMessage implements Serializable private static class WorkMessage implements Serializable
@ -75,7 +80,7 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
private static class Worker implements MessageListener private static class Worker implements MessageListener
{ {
/** Counter shared between workers to decided when new work-item messages are created. */ /** Counter shared between workers to decided when new work-item messages are created. */
private static Integer counter = new Integer(0); private static AtomicInteger counter = new AtomicInteger(0);
/** Session to use. */ /** Session to use. */
private Session session; private Session session;
@ -104,13 +109,9 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
boolean sendMessage = false; boolean sendMessage = false;
// Don't create a new work item for every 1000th message. */ // Don't create a new work item for every 1000th message. */
synchronized (counter) if (counter.incrementAndGet() % 1000 != 0)
{ {
counter++; sendMessage = true;
if (counter % 1000 != 0)
{
sendMessage = true;
}
} }
if (sendMessage) if (sendMessage)
@ -140,16 +141,11 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
} }
/** Master message handler. Process ack messages. */ /** Master message handler. Process ack messages. */
public synchronized void onMessage(javax.jms.Message message) public void onMessage(javax.jms.Message message)
{ {
acksReceived++; long acks = acksReceived.incrementAndGet();
if (acksReceived == expectedCount) latch.get().countDown();
{ if (acks % 100 == 0) {
// If expected number of acks are received, wake up the main process.
notify();
}
if (acksReceived % 100 == 0)
{
System.out.println("Master now has ack count of: " + acksReceived); System.out.println("Master now has ack count of: " + acksReceived);
} }
} }
@ -173,7 +169,7 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
super.tearDown(); super.tearDown();
} }
public synchronized void testActiveMQ() public void testActiveMQ()
throws Exception throws Exception
{ {
// Create the connection to the broker. // Create the connection to the broker.
@ -198,30 +194,32 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
} }
// Send a message to the work queue, and wait for the 1000 acks from the workers. // Send a message to the work queue, and wait for the 1000 acks from the workers.
expectedCount = 1000; acksReceived.set(0);
acksReceived = 0; latch.set(new CountDownLatch(1000));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
while (acksReceived != expectedCount)
{ if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
wait(); fail("First batch only received " + acksReceived + " messages");
} }
System.out.println("First batch received"); System.out.println("First batch received");
// Send another message to the work queue, and wait for the next 1000 acks. It is // Send another message to the work queue, and wait for the next 1000 acks. It is
// at this point where the workers never get notified of this message, as they // at this point where the workers never get notified of this message, as they
// have a large pending queue. Creating a new worker at this point however will // have a large pending queue. Creating a new worker at this point however will
// receive this new message. // receive this new message.
expectedCount = 2000; acksReceived.set(0);
latch.set(new CountDownLatch(1000));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
while (acksReceived != expectedCount)
{ if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
wait(); fail("Second batch only received " + acksReceived + " messages");
} }
System.out.println("Second batch received"); System.out.println("Second batch received");
// Cleanup all JMS resources. // Cleanup all JMS resources.
for (int i = 0; i < NUM_WORKERS; i++) for (int i = 0; i < NUM_WORKERS; i++) {
{
workers[i].close(); workers[i].close();
} }
masterSession.close(); masterSession.close();