combine the lists in the correct order for later redispatch.
(cherry picked from commit 4e23adfcc9)
This commit is contained in:
Timothy Bish 2016-06-29 12:57:30 -04:00
parent 8ae7c8f3a7
commit 6fda5e3262
2 changed files with 12 additions and 12 deletions

View File

@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
policyEntry.setQueue(">");
policyEntry.setStrictOrderDispatch(true);
policyEntry.setProducerFlowControl(true);
policyEntry.setMemoryLimit(1024 * 1024);
policyEntries.add(policyEntry);
@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
sendMessages(5);
int counter = 0;
while (counter++ < 10) {
while (counter++ < 20) {
LOG.info("Creating connection using prefetch of: {}", prefetch);
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
Message message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
LOG.info("Read message = {}", ((TextMessage) message).getText());
int sequenceID = message.getIntProperty("sequenceID");
assertEquals(0, sequenceID);
LOG.info("Read message = {}", ((TextMessage) message).getText());
session.rollback();
session.close();
connection.close();

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
// Here is a potential problem concerning Inflight stat:
// Messages not already committed or rolled back may not be removed from dispatched list at the moment
// Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
redispatch.addAll(pending.remove(context, destination));
if (dispatched == null) {
return rc;
return redispatch;
}
// Synchronized to DispatchLock if necessary
if (dispatched == this.dispatched) {
synchronized(dispatchLock) {
updateDestinationStats(rc, destination, dispatched);
addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
} else {
updateDestinationStats(rc, destination, dispatched);
addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
}
}
return rc;
return redispatch;
}
private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) {
@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
}
}
rc.addAll(references);
redispatch.addAll(0, references);
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}