mirror of
https://github.com/apache/activemq.git
synced 2025-02-17 07:24:51 +00:00
Properly handle a rollback() when call from an onMessaqge()
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@394851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
535c30c3e8
commit
b708fd32c3
@ -602,10 +602,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||||||
this.info.setCurrentPrefetchSize(prefetch);
|
this.info.setCurrentPrefetchSize(prefetch);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void beforeMessageIsConsumed(MessageDispatch md) {
|
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
|
||||||
md.setDeliverySequenceId(session.getNextDeliveryId());
|
md.setDeliverySequenceId(session.getNextDeliveryId());
|
||||||
if (!session.isDupsOkAcknowledge())
|
if (!session.isDupsOkAcknowledge()) {
|
||||||
deliveredMessages.addFirst(md);
|
deliveredMessages.addFirst(md);
|
||||||
|
if( session.isTransacted() ) {
|
||||||
|
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
|
private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
|
||||||
@ -615,9 +619,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||||||
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
||||||
}else{
|
}else{
|
||||||
stats.onMessage();
|
stats.onMessage();
|
||||||
if(session.isTransacted()){
|
if( session.isTransacted() ) {
|
||||||
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
} else if(session.isAutoAcknowledge()) {
|
||||||
}else if(session.isAutoAcknowledge()){
|
|
||||||
if(!deliveredMessages.isEmpty()){
|
if(!deliveredMessages.isEmpty()){
|
||||||
if(optimizeAcknowledge){
|
if(optimizeAcknowledge){
|
||||||
if(deliveryingAcknowledgements.compareAndSet(false,true)){
|
if(deliveryingAcknowledgements.compareAndSet(false,true)){
|
||||||
@ -636,11 +639,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||||||
deliveredMessages.clear();
|
deliveredMessages.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else if(session.isDupsOkAcknowledge()){
|
} else if(session.isDupsOkAcknowledge()){
|
||||||
ackLater(md,MessageAck.STANDARD_ACK_TYPE);
|
ackLater(md,MessageAck.STANDARD_ACK_TYPE);
|
||||||
}else if(session.isClientAcknowledge()){
|
} else if(session.isClientAcknowledge()){
|
||||||
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
|
||||||
}else{
|
} else{
|
||||||
throw new IllegalStateException("Invalid session state.");
|
throw new IllegalStateException("Invalid session state.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user