reduce unmatched ack exceptions, tidy up prefetchExtension https://issues.apache.org/activemq/browse/AMQ-2560

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@897988 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-01-11 18:22:01 +00:00
parent a9e7e94054
commit 62eb5cf581
3 changed files with 18 additions and 22 deletions

View File

@ -937,6 +937,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @throws JMSException * @throws JMSException
*/ */
public void acknowledge() throws JMSException { public void acknowledge() throws JMSException {
synchronized (unconsumedMessages.getMutex()) {
clearDispatchListOnReconnect();
}
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
// Acknowledge all messages so far. // Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);

View File

@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("ack:" + ack); LOG.info("ack:" + ack);
} }
synchronized(dispatchLock) { synchronized(dispatchLock) {
if (ack.isStandardAck()) { if (ack.isStandardAck()) {
@ -256,9 +256,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
prefetchExtension = Math.max( prefetchExtension = Math.max(
prefetchExtension, index ); prefetchExtension, index );
} }
} else {
prefetchExtension = Math.max(0,
prefetchExtension - index);
} }
destination = node.getRegionDestination(); destination = node.getRegionDestination();
callDispatchMatched = true; callDispatchMatched = true;
@ -319,8 +316,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} else if (ack.isRedeliveredAck()) { } else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be // Message was re-delivered but it was not yet considered to be
// a DLQ message. // a DLQ message.
// Acknowledge all dispatched messages up till the message id of
// the ack.
boolean inAckRange = false; boolean inAckRange = false;
for (final MessageReference node : dispatched) { for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId(); MessageId messageId = node.getMessageId();
@ -349,9 +344,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
throw new JMSException("Poison ack cannot be transacted: " throw new JMSException("Poison ack cannot be transacted: "
+ ack); + ack);
} }
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
int index = 0; int index = 0;
boolean inAckRange = false; boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>(); List<MessageReference> removeList = new ArrayList<MessageReference>();

View File

@ -201,26 +201,27 @@ public class FailoverTransport implements CompositeTransport {
transport.setTransportListener(disposedListener); transport.setTransportListener(disposedListener);
ServiceSupport.dispose(transport); ServiceSupport.dispose(transport);
synchronized (reconnectMutex) {
boolean reconnectOk = false; boolean reconnectOk = false;
synchronized (reconnectMutex) {
if(started) { if(started) {
LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e); LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
LOG.debug("Transport failed with the following exception:", e); LOG.debug("Transport failed with the following exception:", e);
reconnectOk = true; reconnectOk = true;
} }
initialized = false; initialized = false;
failedConnectTransportURI=connectedTransportURI; failedConnectTransportURI=connectedTransportURI;
connectedTransportURI = null; connectedTransportURI = null;
connected=false; connected=false;
// notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
if (reconnectOk) { if (reconnectOk) {
reconnectTask.wakeup(); reconnectTask.wakeup();
} }
} }
if (transportListener != null) {
transportListener.transportInterupted();
}
} }
} }
@ -412,8 +413,8 @@ public class FailoverTransport implements CompositeTransport {
// Skipping send of ShutdownInfo command when not connected. // Skipping send of ShutdownInfo command when not connected.
return; return;
} }
if(command instanceof RemoveInfo) { if(command instanceof RemoveInfo || command.isMessageAck()) {
// Simulate response to RemoveInfo command // Simulate response to RemoveInfo command or ack (as it will be stale)
stateTracker.track(command); stateTracker.track(command);
Response response = new Response(); Response response = new Response();
response.setCorrelationId(command.getCommandId()); response.setCorrelationId(command.getCommandId());
@ -432,7 +433,7 @@ public class FailoverTransport implements CompositeTransport {
while (transport == null && !disposed while (transport == null && !disposed
&& connectionFailure == null && connectionFailure == null
&& !Thread.currentThread().isInterrupted()) { && !Thread.currentThread().isInterrupted()) {
LOG.trace("Waiting for transport to reconnect."); LOG.trace("Waiting for transport to reconnect..: " + command);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
if (timeout > 0 && (end - start > timeout)) { if (timeout > 0 && (end - start > timeout)) {
timedout = true; timedout = true;