git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@657817 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-19 13:06:37 +00:00
parent ca36136639
commit ab3b155ff6
6 changed files with 179 additions and 3 deletions

View File

@ -858,7 +858,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
session.asyncSendPacket(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);

View File

@ -133,6 +133,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
/**
* Only acknowledge an individual message - using message.acknowledge()
* as opposed to CLIENT_ACKNOWLEDGE which
* acknowledges all messages consumed by a session at when acknowledge()
* is called
*/
public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static interface DeliveryListener {

View File

@ -252,7 +252,24 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
+ ack);
}
}
} else if (ack.isDeliveredAck()) {
} else if (ack.isIndividualAck()) {
// Message was delivered and acknowledge - but only delete the
// individual message
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getLastMessageId().equals(messageId)) {
// this should never be within a transaction
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
destination = node.getRegionDestination();
acknowledge(context, ack, node);
dispatched.remove(node);
prefetchExtension = Math.max(0, prefetchExtension - 1);
callDispatchMatched = true;
break;
}
}
}else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
// Acknowledge all dispatched messages up till the message id of

View File

@ -182,7 +182,7 @@ public class TopicSubscription extends AbstractSubscription {
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
if (ack.isStandardAck() || ack.isPoisonAck()) {
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {

View File

@ -54,6 +54,11 @@ public class MessageAck extends BaseCommand {
*/
public static final byte REDELIVERED_ACK_TYPE = 3;
/**
* The ack case where a client wants only an individual message to be discarded.
*/
public static final byte INDIVIDUAL_ACK_TYPE = 4;
protected byte ackType;
protected ConsumerId consumerId;
protected MessageId firstMessageId;
@ -108,6 +113,10 @@ public class MessageAck extends BaseCommand {
public boolean isRedeliveredAck() {
return ackType == REDELIVERED_ACK_TYPE;
}
public boolean isIndividualAck() {
return ackType == INDIVIDUAL_ACK_TYPE;
}
/**
* @openwire:property version=1 cache=true

View File

@ -0,0 +1,144 @@
package org.apache.activemq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @version $Revision: 1.4 $
*/
public class JMSIndividualAckTest extends TestSupport {
private Connection connection;
protected void setUp() throws Exception {
super.setUp();
connection = createConnection();
}
/**
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
super.tearDown();
}
/**
* Tests if acknowledged messages are being consumed.
*
* @throws JMSException
*/
public void testAckedMessageAreConsumed() throws JMSException {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
// Reset the session.
session.close();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
/**
* Tests if acknowledged messages are being consumed.
*
* @throws JMSException
*/
public void testLastMessageAcked() throws JMSException {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
TextMessage msg1 = session.createTextMessage("msg1");
TextMessage msg2 = session.createTextMessage("msg2");
TextMessage msg3 = session.createTextMessage("msg3");
producer.send(msg1);
producer.send(msg2);
producer.send(msg3);
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = consumer.receive(1000);
assertNotNull(msg);
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
// Reset the session.
session.close();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals(msg1,msg);
msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals(msg2,msg);
msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
/**
* Tests if unacknowledged messages are being re-delivered when the consumer connects again.
*
* @throws JMSException
*/
public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
// Don't ack the message.
// Reset the session. This should cause the unacknowledged message to be re-delivered.
session.close();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(2000);
assertNotNull(msg);
msg.acknowledge();
session.close();
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
}