AMQ-1976, individual messages passing through a network bridge need an individual ack; for an individual ack, it is ok to have more than one message dispatched

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@705281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-10-16 16:48:52 +00:00
parent bc7e759662
commit fdee6e81e2
3 changed files with 36 additions and 36 deletions

View File

@ -411,40 +411,44 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @param lastAckedMsg * @param lastAckedMsg
* @throws JMSException if it does not match * @throws JMSException if it does not match
*/ */
protected void assertAckMatchesDispatched(MessageAck ack) protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
throws JMSException {
MessageId firstAckedMsg = ack.getFirstMessageId(); MessageId firstAckedMsg = ack.getFirstMessageId();
MessageId lastAckedMsg = ack.getLastMessageId(); MessageId lastAckedMsg = ack.getLastMessageId();
int checkCount = 0;
boolean checkFoundStart = false;
boolean checkFoundEnd = false;
for (MessageReference node : dispatched) {
int checkCount = 0; if (firstAckedMsg == null) {
boolean checkFoundStart = false; checkFoundStart = true;
boolean checkFoundEnd = false; } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
for (MessageReference node : dispatched) { checkFoundStart = true;
}
if( firstAckedMsg == null ) {
checkFoundStart=true;
} else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
checkFoundStart = true;
}
if (checkFoundStart) { if (checkFoundStart) {
checkCount++; checkCount++;
} }
if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
checkFoundEnd = true; checkFoundEnd = true;
break; break;
} }
} }
if (!checkFoundStart && firstAckedMsg != null) if (!checkFoundStart && firstAckedMsg != null)
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)"); throw new JMSException("Unmatched acknowledege: " + ack
if (!checkFoundEnd && lastAckedMsg != null) + "; Could not find Message-ID " + firstAckedMsg
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)"); + " in dispatched-list (start of ack)");
if (ack.getMessageCount() != checkCount) { if (!checkFoundEnd && lastAckedMsg != null)
throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+ throw new JMSException("Unmatched acknowledege: " + ack
") differs from count in dispatched-list ("+checkCount+")"); + "; Could not find Message-ID " + lastAckedMsg
} + " in dispatched-list (end of ack)");
} if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
throw new JMSException("Unmatched acknowledege: " + ack
+ "; Expected message count (" + ack.getMessageCount()
+ ") differs from count in dispatched-list (" + checkCount
+ ")");
}
}
/** /**
* @param context * @param context

View File

@ -647,7 +647,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
else{ else{
LOG.info("Message not forwarded on to remote, because message came from remote"); LOG.info("Message not forwarded on to remote, because message came from remote");
} }
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();
} else { } else {
@ -664,7 +664,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
ExceptionResponse er = (ExceptionResponse)response; ExceptionResponse er = (ExceptionResponse)response;
serviceLocalException(er.getException()); serviceLocalException(er.getException());
} else { } else {
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -39,7 +39,6 @@ public class QueueMemoryFullMultiBrokersTest extends JmsMultipleBrokersTestSuppo
public static final int MESSAGE_COUNT = 2000; public static final int MESSAGE_COUNT = 2000;
public void testQueueNetworkWithConsumerFull() throws Exception { public void testQueueNetworkWithConsumerFull() throws Exception {
if (true) return;
bridgeAllBrokers(); bridgeAllBrokers();
startAllBrokers(); startAllBrokers();
@ -67,8 +66,6 @@ public class QueueMemoryFullMultiBrokersTest extends JmsMultipleBrokersTestSuppo
assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty()); assertTrue("All messages are consumed and acked from source:" + internalQueue, internalQueue.getMessages().isEmpty());
assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount()); assertEquals("messages source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getMessages().getCount());
assertEquals("inflight source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getInflight().getCount()); assertEquals("inflight source:" + internalQueue, 0, internalQueue.getDestinationStatistics().getInflight().getCount());
} }
public void setUp() throws Exception { public void setUp() throws Exception {
@ -82,7 +79,6 @@ public class QueueMemoryFullMultiBrokersTest extends JmsMultipleBrokersTestSuppo
} }
BrokerService broker2 = brokers.get("Broker2").broker; BrokerService broker2 = brokers.get("Broker2").broker;
applyMemoryLimitPolicy(broker2); applyMemoryLimitPolicy(broker2);
} }
private void applyMemoryLimitPolicy(BrokerService broker) { private void applyMemoryLimitPolicy(BrokerService broker) {