diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index e64d60baff..751da86362 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -937,6 +937,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * @throws JMSException */ public void acknowledge() throws JMSException { + synchronized (unconsumedMessages.getMutex()) { + clearDispatchListOnReconnect(); + } synchronized(deliveredMessages) { // Acknowledge all messages so far. MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 9559767214..aa46a49576 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } if (LOG.isTraceEnabled()) { - LOG.trace("ack:" + ack); + LOG.info("ack:" + ack); } synchronized(dispatchLock) { if (ack.isStandardAck()) { @@ -256,9 +256,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { prefetchExtension = Math.max( prefetchExtension, index ); } - } else { - prefetchExtension = Math.max(0, - prefetchExtension - index); } destination = node.getRegionDestination(); callDispatchMatched = true; @@ -319,8 +316,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } else if (ack.isRedeliveredAck()) { // Message was re-delivered but it was not yet considered to be // a DLQ message. - // Acknowledge all dispatched messages up till the message id of - // the ack. boolean inAckRange = false; for (final MessageReference node : dispatched) { MessageId messageId = node.getMessageId(); @@ -349,9 +344,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { throw new JMSException("Poison ack cannot be transacted: " + ack); } - // Acknowledge all dispatched messages up till the message id of - // the - // acknowledgment. int index = 0; boolean inAckRange = false; List removeList = new ArrayList(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 66a4e844ee..51687cee4e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -201,26 +201,27 @@ public class FailoverTransport implements CompositeTransport { transport.setTransportListener(disposedListener); ServiceSupport.dispose(transport); + boolean reconnectOk = false; synchronized (reconnectMutex) { - boolean reconnectOk = false; if(started) { LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e); LOG.debug("Transport failed with the following exception:", e); reconnectOk = true; - } - + } initialized = false; failedConnectTransportURI=connectedTransportURI; connectedTransportURI = null; connected=false; - if(reconnectOk) { + + // notify before any reconnect attempt so ack state can be whacked + if (transportListener != null) { + transportListener.transportInterupted(); + } + + if (reconnectOk) { reconnectTask.wakeup(); } } - - if (transportListener != null) { - transportListener.transportInterupted(); - } } } @@ -412,8 +413,8 @@ public class FailoverTransport implements CompositeTransport { // Skipping send of ShutdownInfo command when not connected. return; } - if(command instanceof RemoveInfo) { - // Simulate response to RemoveInfo command + if(command instanceof RemoveInfo || command.isMessageAck()) { + // Simulate response to RemoveInfo command or ack (as it will be stale) stateTracker.track(command); Response response = new Response(); response.setCorrelationId(command.getCommandId()); @@ -432,7 +433,7 @@ public class FailoverTransport implements CompositeTransport { while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) { - LOG.trace("Waiting for transport to reconnect."); + LOG.trace("Waiting for transport to reconnect..: " + command); long end = System.currentTimeMillis(); if (timeout > 0 && (end - start > timeout)) { timedout = true; @@ -698,7 +699,7 @@ public class FailoverTransport implements CompositeTransport { t.setTransportListener(myTransportListener); try { if (started) { - restoreTransport(t); + restoreTransport(t); } reconnectDelay = initialReconnectDelay; failedConnectTransportURI=null; @@ -856,7 +857,7 @@ public class FailoverTransport implements CompositeTransport { bt.setTransport(t); backups.add(bt); } - }catch(Exception e) { + } catch(Exception e) { LOG.debug("Failed to build backup ",e); } }