diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index c749462372..25764d0f28 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -200,13 +200,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } if (inAckRange) { // Don't remove the nodes until we are committed. - removeList.add(node); if (!context.isInTransaction()) { dequeueCounter++; if (!this.getConsumerInfo().isBrowser()) { node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); } node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + removeList.add(node); } else { // setup a Synchronization to remove nodes from the // dispatched list. @@ -217,6 +217,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { throws Exception { synchronized(dispatchLock) { dequeueCounter++; + dispatched.remove(node); node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); prefetchExtension--; @@ -224,9 +225,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public void afterRollback() throws Exception { - // Need to put it back in the front. synchronized(dispatchLock) { - dispatched.add(0, node); // ActiveMQ workaround for AMQ-1730 - Please Ignore next line node.incrementRedeliveryCounter(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); @@ -307,11 +306,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } else if (ack.isRedeliveredAck()) { // Message was re-delivered but it was not yet considered to be - // a - // DLQ message. + // a DLQ message. // Acknowledge all dispatched messages up till the message id of - // the - // acknowledgment. + // the ack. boolean inAckRange = false; for (final MessageReference node : dispatched) { MessageId messageId = node.getMessageId(); @@ -396,9 +393,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { + ack + ") was not in the dispatch list: " + dispatched); } else { - LOG - .debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " - + ack); + LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " + + ack); } } } @@ -442,7 +438,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { throw new JMSException("Unmatched acknowledege: " + ack + "; Could not find Message-ID " + lastAckedMsg + " in dispatched-list (end of ack)"); - if (ack.getMessageCount() != checkCount && ack.isStandardAck()) { + if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { throw new JMSException("Unmatched acknowledege: " + ack + "; Expected message count (" + ack.getMessageCount() + ") differs from count in dispatched-list (" + checkCount @@ -571,7 +567,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { break; } - // Synchronize between dispatched list and remove of messageg from pending list + // Synchronize between dispatched list and remove of message from pending list // related to remove subscription action synchronized(dispatchLock) { pending.remove(); diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 2c781011c8..ccafcc8bd4 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -639,10 +639,14 @@ public class JMSConsumerTest extends JmsTestSupport { Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); - // no commit so will auto rollback and get redispatched to redisptachConsumer + // no commit so will auto rollback and get re-dispatched to redisptachConsumer session.close(); - assertNotNull(redispatchConsumer.receive(1000)); + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much! + assertEquals(3, msg.getLongProperty("JMSXDeliveryCount")); redispatchSession.commit(); assertNull(redispatchConsumer.receive(500)); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java index a7d7835880..597b09ce87 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java @@ -35,8 +35,8 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport { Connection connection = factory.createConnection(); connection.setClientID("test"); fail("Did not fail to connect as expected."); - } catch ( JMSException expected ) { - assertTrue("reason is java.net.UnknownHostException", expected.getCause() instanceof java.net.UnknownHostException); + } catch ( JMSException expected ) { + assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException); } } }