This commit is contained in:
Clebert Suconic 2020-03-17 14:08:20 -04:00
commit 64a84805fb
2 changed files with 87 additions and 3 deletions

View File

@ -68,6 +68,7 @@ public class AMQConsumer {
private int prefetchSize;
private final AtomicInteger currentWindow;
private final AtomicInteger deliveredAcks;
private long messagePullSequence = 0;
private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<>(null);
//internal means we don't expose
@ -87,6 +88,7 @@ public class AMQConsumer {
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
this.currentWindow = new AtomicInteger(prefetchSize);
this.deliveredAcks = new AtomicInteger(0);
if (prefetchSize == 0) {
messagePullHandler.set(new MessagePullHandler());
}
@ -228,7 +230,7 @@ public class AMQConsumer {
return info.getConsumerId();
}
public void acquireCredit(int n) throws Exception {
public void acquireCredit(int n) {
if (messagePullHandler.get() != null) {
//don't acquire any credits when the pull handler controls it!!
return;
@ -240,7 +242,6 @@ public class AMQConsumer {
if (promptDelivery) {
serverConsumer.promptDelivery();
}
}
public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
@ -305,8 +306,20 @@ public class AMQConsumer {
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck())) {
acquireCredit(ackList.size());
this.deliveredAcks.getAndUpdate(deliveredAcks -> {
if (deliveredAcks >= ackList.size()) {
return deliveredAcks - ackList.size();
}
acquireCredit(ackList.size() - deliveredAcks);
return 0;
});
} else {
if (ack.isDeliveredAck()) {
this.deliveredAcks.addAndGet(ack.getMessageCount());
}
acquireCredit(ack.getMessageCount());
}

View File

@ -23,7 +23,12 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Test;
/**
@ -132,6 +137,72 @@ public class JmsClientAckTest extends BasicOpenWireTest {
session.close();
}
/**
* Tests if acknowledged messages are being consumed.
*
* @throws JMSException
*/
@Test
public void testAckedMessageDeliveringWithPrefetch() throws Exception {
final int prefetchSize = 10;
final int messageCount = 5 * prefetchSize;
connection.getPrefetchPolicy().setAll(prefetchSize);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < messageCount; i++) {
producer.send(session.createTextMessage("MSG" + i));
}
// Consume the messages...
Message msg;
MessageConsumer consumer = session.createConsumer(queue);
Wait.assertEquals(0L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
ArrayList<Message> messages = new ArrayList<>();
for (int i = 0; i < prefetchSize; i++) {
msg = consumer.receive(1000);
assertNotNull(msg);
messages.add(msg);
}
Wait.assertEquals(0L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(2 * prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
for (int i = 0; i < prefetchSize; i++) {
msg = messages.get(i);
msg.acknowledge();
}
Wait.assertEquals((long) prefetchSize, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
for (int i = 0; i < messageCount - prefetchSize; i++) {
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
}
Wait.assertEquals((long)messageCount, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(0, () -> queueControl.getDeliveringCount(), 3000, 100);
// Reset the session.
session.close();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receiveNoWait();
assertNull(msg);
session.close();
}
protected String getQueueName() {
return queueName;
}