ARTEMIS-3646 - OpenWire, Fix overflowing prefetch and incorrect metrics for messages sent to DLQ

This commit is contained in:
AntonRoskvist 2022-02-01 13:19:07 +01:00 committed by clebertsuconic
parent 22bb421ab9
commit 263b723726
3 changed files with 88 additions and 1 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -347,7 +348,9 @@ public class AMQConsumer {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
((QueueImpl) ref.getQueue()).decDelivering(ref);
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
((QueueImpl) ref.getQueue()).incDelivering(ref);
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
@ -483,6 +486,7 @@ public class AMQConsumer {
}
public void addRolledback(MessageReference messageReference) {
currentWindow.decrementAndGet();
getRolledbackMessageRefsOrCreate().add(messageReference);
}

View File

@ -4395,7 +4395,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private void incDelivering(MessageReference ref) {
public void incDelivering(MessageReference ref) {
deliveringMetrics.incrementMetrics(ref);
}

View File

@ -29,7 +29,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
@ -396,6 +399,86 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
}
/**
* @throws Exception
*/
@Test
public void testRedeliveredMessageNotOverflowingPrefetch() throws Exception {
final int prefetchSize = 10;
final int messageCount = 2 * prefetchSize;
connection.getPrefetchPolicy().setAll(prefetchSize);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + "TEST");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < messageCount; i++) {
producer.send(session.createTextMessage("MSG" + i));
session.commit();
}
Message m;
MessageConsumer consumer = session.createConsumer(destination);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
for (int i = 0; i < messageCount; i++) {
m = consumer.receive(2000);
assertNotNull(m);
if (i == 3) {
session.rollback();
continue;
}
session.commit();
assertTrue(queueControl.getDeliveringCount() <= prefetchSize);
}
m = consumer.receive(2000);
assertNotNull(m);
session.commit();
}
/**
* @throws Exception
*/
@Test
public void testCountersAreCorrectAfterSendToDLQ() throws Exception {
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setMaximumRedeliveries(0);
policy.setInitialRedeliveryDelay(0);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + "TEST");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("The Message"));
session.commit();
Message m;
MessageConsumer consumer = session.createConsumer(destination);
m = consumer.receive(2000);
assertNotNull(m);
session.rollback();
Wait.assertEquals(0, () -> queueControl.getMessageCount());
session.close();
assertEquals(0L, queueControl.getPersistentSize());
}
@Test
public void testInitialRedeliveryDelayZero() throws Exception {
// Receive a message with the JMS API