diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java index c286497caf..2134759625 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java @@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { policyEntry.setQueue(">"); policyEntry.setStrictOrderDispatch(true); - policyEntry.setProducerFlowControl(true); - policyEntry.setMemoryLimit(1024 * 1024); policyEntries.add(policyEntry); @@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { sendMessages(5); int counter = 0; - while (counter++ < 10) { + while (counter++ < 20) { LOG.info("Creating connection using prefetch of: {}", prefetch); JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch)); @@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { Message message = consumer.receive(5000); assertNotNull(message); assertTrue(message instanceof TextMessage); + LOG.info("Read message = {}", ((TextMessage) message).getText()); int sequenceID = message.getIntProperty("sequenceID"); assertEquals(0, sequenceID); - LOG.info("Read message = {}", ((TextMessage) message).getText()); session.rollback(); session.close(); connection.close(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 8332d25d5b..b8184b1c52 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public List remove(ConnectionContext context, Destination destination, List dispatched) throws Exception { - List rc = new ArrayList(); + LinkedList redispatch = new LinkedList(); synchronized(pendingLock) { super.remove(context, destination); // Here is a potential problem concerning Inflight stat: // Messages not already committed or rolled back may not be removed from dispatched list at the moment // Except if each commit or rollback callback action comes before remove of subscriber. - rc.addAll(pending.remove(context, destination)); + redispatch.addAll(pending.remove(context, destination)); if (dispatched == null) { - return rc; + return redispatch; } // Synchronized to DispatchLock if necessary if (dispatched == this.dispatched) { synchronized(dispatchLock) { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } else { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } - return rc; + + return redispatch; } - private void updateDestinationStats(List rc, Destination destination, List dispatched) { + private void addReferencesAndUpdateRedispatch(LinkedList redispatch, Destination destination, List dispatched) { ArrayList references = new ArrayList(); for (MessageReference r : dispatched) { if (r.getRegionDestination() == destination) { @@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); } } - rc.addAll(references); + redispatch.addAll(0, references); destination.getDestinationStatistics().getInflight().subtract(references.size()); dispatched.removeAll(references); }