git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@941174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-05-05 07:09:37 +00:00
parent 9b7ce0e490
commit a3344e61a9
2 changed files with 130 additions and 8 deletions

View File

@ -95,6 +95,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>(); private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a subscription // Messages that are paged in but have not yet been targeted at a subscription
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100); private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@ -377,7 +378,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
wakeup(); wakeup();
} }
} }
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception { public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent // synchronize with dispatch method so that no new messages are sent
@ -406,7 +407,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
getMessageGroupOwners().removeConsumer(consumerId); getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages // redeliver inflight messages
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
for (MessageReference ref : sub.remove(context, this)) { for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference) ref; QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) { if (qmr.getLockOwner() == sub) {
@ -416,11 +417,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
qmr.incrementRedeliveryCounter(); qmr.incrementRedeliveryCounter();
} }
} }
list.add(qmr); redeliveredWaitingDispatch.add(qmr);
} }
if (!redeliveredWaitingDispatch.isEmpty()) {
if (!list.isEmpty()) { doDispatch(new ArrayList());
doDispatch(list);
} }
} }
if (!(this.optimizedDispatch || isSlave())) { if (!(this.optimizedDispatch || isSlave())) {
@ -1220,7 +1220,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Perhaps we should page always into the pagedInPendingDispatch list if // Perhaps we should page always into the pagedInPendingDispatch list if
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// then we do a dispatch. // then we do a dispatch.
if (pageInMoreMessages || pendingBrowserDispatch != null) { if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
try { try {
pageInMessages(pendingBrowserDispatch != null); pageInMessages(pendingBrowserDispatch != null);
@ -1494,8 +1494,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
synchronized (dispatchMutex) { synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) { synchronized (pagedInPendingDispatch) {
if (!redeliveredWaitingDispatch.isEmpty()) {
// Try first to dispatch redelivered messages to keep an proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (!pagedInPendingDispatch.isEmpty()) { if (!pagedInPendingDispatch.isEmpty()) {
// Try to first dispatch anything that had not been // Next dispatch anything that had not been
// dispatched before. // dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
} }

View File

@ -0,0 +1,118 @@
package org.apache.activemq.bugs;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.bugs.AMQ1866.ConsumerThread;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class OutOfOrderTestCase extends TestCase {
private static final Log log = LogFactory.getLog(OutOfOrderTestCase.class);
public static final String BROKER_URL = "tcp://localhost:61616";
private static final int PREFETCH = 10;
private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH;
public static final String QUEUE_NAME = "QUEUE";
private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
BrokerService brokerService;
Session session;
Connection connection;
int seq = 0;
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector(BROKER_URL);
brokerService.deleteAllMessages();
brokerService.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
protected void tearDown() throws Exception {
session.close();
connection.close();
brokerService.stop();
}
public void testOrder() throws Exception {
log.info("Producing messages 0-29 . . .");
Destination destination = session.createQueue(DESTINATION);
final MessageProducer messageProducer = session
.createProducer(destination);
try {
for (int i = 0; i < 30; ++i) {
final Message message = session
.createTextMessage(createMessageText(i));
message.setStringProperty("JMSXGroupID", "FOO");
messageProducer.send(message);
log.info("sent " + toString(message));
}
} finally {
messageProducer.close();
}
log.info("Consuming messages 0-9 . . .");
consumeBatch();
log.info("Consuming messages 10-19 . . .");
consumeBatch();
log.info("Consuming messages 20-29 . . .");
consumeBatch();
}
protected void consumeBatch() throws Exception {
Destination destination = session.createQueue(DESTINATION);
final MessageConsumer messageConsumer = session.createConsumer(destination);
try {
for (int i = 0; i < 10; ++i) {
final Message message = messageConsumer.receive(1000L);
log.info("received " + toString(message));
assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText());
message.acknowledge();
}
} finally {
messageConsumer.close();
}
}
private String toString(final Message message) throws JMSException {
String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
if (message.getJMSRedelivered())
ret += " (redelivered)";
return ret;
}
private static String createMessageText(final int index) {
return "message #" + index;
}
}