Fix for the QueueWorkerPrefetchTest. The VMPendingMessageCursor.isEmpty() was returning true when it had an a message that had been marked dropped due to it being delivered by another subscription.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@573400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-09-06 22:19:25 +00:00
parent 222daf2323
commit b732d3d114
4 changed files with 48 additions and 17 deletions

View File

@ -55,5 +55,10 @@ public interface MessageReference {
* Returns true if this message is expired * Returns true if this message is expired
*/ */
boolean isExpired(); boolean isExpired();
/**
* Returns true if this message is dropped.
*/
boolean isDropped();
} }

View File

@ -35,7 +35,19 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public boolean isEmpty() { public boolean isEmpty() {
return list.isEmpty(); if (list.isEmpty()) {
return true;
} else {
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (!node.isDropped()) {
return false;
}
// We can remove dropped references.
iterator.remove();
}
return true;
}
} }
/** /**

View File

@ -689,4 +689,8 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public void setBrokerOutTime(long brokerOutTime) { public void setBrokerOutTime(long brokerOutTime) {
this.brokerOutTime = brokerOutTime; this.brokerOutTime = brokerOutTime;
} }
public boolean isDropped() {
return false;
}
} }

View File

@ -29,6 +29,7 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
@ -42,6 +43,7 @@ import org.apache.activemq.broker.BrokerService;
*/ */
public class QueueWorkerPrefetchTest extends TestCase implements MessageListener public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
{ {
private static final int BATCH_SIZE = 10;
private static final long WAIT_TIMEOUT = 1000*10; private static final long WAIT_TIMEOUT = 1000*10;
/** The connection URL. */ /** The connection URL. */
@ -70,6 +72,14 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
/** Messages sent to the work-item queue. */ /** Messages sent to the work-item queue. */
private static class WorkMessage implements Serializable private static class WorkMessage implements Serializable
{ {
private final int id;
public WorkMessage(int id) {
this.id = id;
}
@Override
public String toString() {
return "Work: "+id;
}
} }
/** /**
@ -79,6 +89,7 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
*/ */
private static class Worker implements MessageListener private static class Worker implements MessageListener
{ {
/** Counter shared between workers to decided when new work-item messages are created. */ /** Counter shared between workers to decided when new work-item messages are created. */
private static AtomicInteger counter = new AtomicInteger(0); private static AtomicInteger counter = new AtomicInteger(0);
@ -106,24 +117,23 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
{ {
try try
{ {
boolean sendMessage = false; WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
// Don't create a new work item for every 1000th message. */ long c = counter.incrementAndGet();
if (counter.incrementAndGet() % 1000 != 0) if (c % 1 == 0) {
{ System.out.println("Worker now has message count of: " + c);
sendMessage = true;
} }
if (sendMessage) // Don't create a new work item for every BATCH_SIZE message. */
if (c % BATCH_SIZE != 0)
{ {
// Send new work item to work-item queue. // Send new work item to work-item queue.
workItemProducer.send(session.createObjectMessage( workItemProducer.send(session.createObjectMessage(
new WorkMessage())); new WorkMessage(work.id+1)));
} }
// Send ack to master. // Send ack to master.
masterItemProducer.send(session.createObjectMessage( masterItemProducer.send(session.createObjectMessage(work));
new WorkMessage()));
} }
catch (JMSException e) catch (JMSException e)
{ {
@ -145,7 +155,7 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
{ {
long acks = acksReceived.incrementAndGet(); long acks = acksReceived.incrementAndGet();
latch.get().countDown(); latch.get().countDown();
if (acks % 100 == 0) { if (acks % 1 == 0) {
System.out.println("Master now has ack count of: " + acksReceived); System.out.println("Master now has ack count of: " + acksReceived);
} }
} }
@ -193,10 +203,10 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
} }
// Send a message to the work queue, and wait for the 1000 acks from the workers. // Send a message to the work queue, and wait for the BATCH_SIZE acks from the workers.
acksReceived.set(0); acksReceived.set(0);
latch.set(new CountDownLatch(1000)); latch.set(new CountDownLatch(BATCH_SIZE));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("First batch only received " + acksReceived + " messages"); fail("First batch only received " + acksReceived + " messages");
@ -209,8 +219,8 @@ public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
// have a large pending queue. Creating a new worker at this point however will // have a large pending queue. Creating a new worker at this point however will
// receive this new message. // receive this new message.
acksReceived.set(0); acksReceived.set(0);
latch.set(new CountDownLatch(1000)); latch.set(new CountDownLatch(BATCH_SIZE));
workItemProducer.send(masterSession.createObjectMessage(new WorkMessage())); workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("Second batch only received " + acksReceived + " messages"); fail("Second batch only received " + acksReceived + " messages");