git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@818488 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-09-24 14:03:03 +00:00
parent 71da0cd97a
commit 660f9cf819
1 changed files with 14 additions and 6 deletions

View File

@ -238,6 +238,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
private boolean isAutoAcknowledgeEach() {
return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
}
private boolean isAutoAcknowledgeBatch() {
return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
}
public StatsImpl getStats() { public StatsImpl getStats() {
return stats; return stats;
} }
@ -642,7 +650,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
void deliverAcks() { void deliverAcks() {
MessageAck ack = null; MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (deliveryingAcknowledgements.compareAndSet(false, true)) {
if (session.isAutoAcknowledge()) { if (isAutoAcknowledgeEach()) {
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) { if (ack != null) {
@ -687,7 +695,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Ack any delivered messages now. // Ack any delivered messages now.
if (!session.getTransacted()) { if (!session.getTransacted()) {
deliverAcks(); deliverAcks();
if (session.isDupsOkAcknowledge()) { if (isAutoAcknowledgeBatch()) {
acknowledge(); acknowledge();
} }
} }
@ -771,7 +779,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId()); md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!session.isDupsOkAcknowledge()) { if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
deliveredMessages.addFirst(md); deliveredMessages.addFirst(md);
} }
@ -795,7 +803,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
stats.onMessage(); stats.onMessage();
if (session.getTransacted()) { if (session.getTransacted()) {
// Do nothing. // Do nothing.
} else if (session.isAutoAcknowledge()) { } else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) { if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) { if (!deliveredMessages.isEmpty()) {
@ -820,7 +828,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
deliveryingAcknowledgements.set(false); deliveryingAcknowledgements.set(false);
} }
} else if (session.isDupsOkAcknowledge()) { } else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE); ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
@ -1081,7 +1089,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
afterMessageIsConsumed(md, expired); afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) { } catch (RuntimeException e) {
if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) { if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// Redeliver the message // Redeliver the message
} else { } else {
// Transacted or Client ack: Deliver the // Transacted or Client ack: Deliver the