git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@656980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-16 09:21:44 +00:00
parent 6dc8cd034b
commit 2076f452e7
2 changed files with 17 additions and 4 deletions

View File

@ -477,7 +477,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
} }
} }
if (session.isClientAcknowledge()) { if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() { m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception { public void execute() throws Exception {
session.checkClosed(); session.checkClosed();
@ -767,7 +767,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackLater(md, MessageAck.STANDARD_ACK_TYPE); ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()) { } else if (session.isClientAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else { } else if (session.isIndividualAcknowledge()){
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);
}
}
else {
throw new IllegalStateException("Invalid session state."); throw new IllegalStateException("Invalid session state.");
} }
} }
@ -968,7 +975,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
afterMessageIsConsumed(md, expired); afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) { } catch (RuntimeException e) {
if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge()) { if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
// Redeliver the message // Redeliver the message
} else { } else {
// Transacted or Client ack: Deliver the // Transacted or Client ack: Deliver the

View File

@ -133,6 +133,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static interface DeliveryListener { public static interface DeliveryListener {
void beforeDelivery(ActiveMQSession session, Message msg); void beforeDelivery(ActiveMQSession session, Message msg);
@ -710,7 +712,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
continue; continue;
} }
if (isClientAcknowledge()) { if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() { message.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception { public void execute() throws Exception {
} }
@ -1706,6 +1708,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
} }
public boolean isIndividualAcknowledge(){
return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
}
/** /**
* Returns the message delivery listener. * Returns the message delivery listener.
* *