mirror of https://github.com/apache/activemq.git
AMQ-2401: Applying patch which makes the DUPS_OK on Queue case use the same ack strategy that is used by AUTO_ACK.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@818487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ded63631fd
commit
ec821411aa
|
@ -238,6 +238,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isAutoAcknowledgeEach() {
|
||||||
|
return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isAutoAcknowledgeBatch() {
|
||||||
|
return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
|
||||||
|
}
|
||||||
|
|
||||||
public StatsImpl getStats() {
|
public StatsImpl getStats() {
|
||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
@ -642,7 +650,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
void deliverAcks() {
|
void deliverAcks() {
|
||||||
MessageAck ack = null;
|
MessageAck ack = null;
|
||||||
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
|
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
|
||||||
if (session.isAutoAcknowledge()) {
|
if (isAutoAcknowledgeEach()) {
|
||||||
synchronized(deliveredMessages) {
|
synchronized(deliveredMessages) {
|
||||||
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
|
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
|
||||||
if (ack != null) {
|
if (ack != null) {
|
||||||
|
@ -687,7 +695,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// Ack any delivered messages now.
|
// Ack any delivered messages now.
|
||||||
if (!session.getTransacted()) {
|
if (!session.getTransacted()) {
|
||||||
deliverAcks();
|
deliverAcks();
|
||||||
if (session.isDupsOkAcknowledge()) {
|
if (isAutoAcknowledgeBatch()) {
|
||||||
acknowledge();
|
acknowledge();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -771,7 +779,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
|
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
|
||||||
md.setDeliverySequenceId(session.getNextDeliveryId());
|
md.setDeliverySequenceId(session.getNextDeliveryId());
|
||||||
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
|
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
|
||||||
if (!session.isDupsOkAcknowledge()) {
|
if (!isAutoAcknowledgeBatch()) {
|
||||||
synchronized(deliveredMessages) {
|
synchronized(deliveredMessages) {
|
||||||
deliveredMessages.addFirst(md);
|
deliveredMessages.addFirst(md);
|
||||||
}
|
}
|
||||||
|
@ -795,7 +803,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
stats.onMessage();
|
stats.onMessage();
|
||||||
if (session.getTransacted()) {
|
if (session.getTransacted()) {
|
||||||
// Do nothing.
|
// Do nothing.
|
||||||
} else if (session.isAutoAcknowledge()) {
|
} else if (isAutoAcknowledgeEach()) {
|
||||||
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
|
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
|
||||||
synchronized (deliveredMessages) {
|
synchronized (deliveredMessages) {
|
||||||
if (!deliveredMessages.isEmpty()) {
|
if (!deliveredMessages.isEmpty()) {
|
||||||
|
@ -820,7 +828,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
deliveryingAcknowledgements.set(false);
|
deliveryingAcknowledgements.set(false);
|
||||||
}
|
}
|
||||||
} else if (session.isDupsOkAcknowledge()) {
|
} else if (isAutoAcknowledgeBatch()) {
|
||||||
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
|
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
|
||||||
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
|
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
|
||||||
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
|
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
|
||||||
|
@ -1081,7 +1089,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
}
|
}
|
||||||
afterMessageIsConsumed(md, expired);
|
afterMessageIsConsumed(md, expired);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
|
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
|
||||||
// Redeliver the message
|
// Redeliver the message
|
||||||
} else {
|
} else {
|
||||||
// Transacted or Client ack: Deliver the
|
// Transacted or Client ack: Deliver the
|
||||||
|
|
Loading…
Reference in New Issue