git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@657155 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-16 18:29:33 +00:00
parent cdd4efabaf
commit 4ddb3e3f6c
1 changed files with 18 additions and 9 deletions

View File

@ -477,13 +477,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
}
});
}else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
acknowledge(md);
}
});
}
return m;
}
@ -765,15 +772,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
} else if (session.isDupsOkAcknowledge()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()) {
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else if (session.isIndividualAcknowledge()){
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);
}
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
@ -855,6 +856,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);
}
}
public void commit() throws JMSException {
synchronized (deliveredMessages) {