ARTEMIS-1982 queue metrics can go negative

When redelivery is exhausted and messages are sent to a DLA with bindings
then some queue metrics can go negative.
This commit is contained in:
Justin Bertram 2019-06-10 15:33:10 +01:00 committed by Clebert Suconic
parent 8335e78094
commit 3ff75c3353
6 changed files with 114 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@ -281,7 +282,14 @@ public interface Queue extends Bindable,CriticalComponent {
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
/**
*
* @param tx
* @param ref
* @return whether or not the message was actually sent to a DLA with bindings
* @throws Exception
*/
boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
@ -315,7 +323,16 @@ public interface Queue extends Bindable,CriticalComponent {
int getGroupCount();
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/**
*
* @param ref
* @param timeBase
* @param ignoreRedeliveryDelay
* @return a Pair of Booleans: the first indicates whether or not redelivery happened; the second indicates whether
* or not the message was actually sent to a DLA with bindings
* @throws Exception
*/
Pair<Boolean, Boolean> checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/**
* It will iterate thorugh memory only (not paging)

View File

@ -1635,13 +1635,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
if (checkRedelivery(reference, timeBase, false)) {
Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
internalAddHead(reference);
}
resetAllIterators();
} else {
} else if (!redeliveryResult.getB()) {
decDelivering(reference);
}
}
@ -2815,9 +2816,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync();
}
}
@Override
public boolean checkRedelivery(final MessageReference reference,
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
final long timeBase,
final boolean ignoreRedeliveryDelay) throws Exception {
Message message = reference.getMessage();
@ -2827,7 +2827,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
}
// no DLQ check on internal queues
return true;
return new Pair<>(true, false);
}
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
@ -2845,9 +2845,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
}
sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
return false;
return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
@ -2866,7 +2866,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
decDelivering(reference);
return true;
return new Pair<>(true, false);
}
}
@ -3114,13 +3114,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
}
private void sendToDeadLetterAddress(final Transaction tx,
private boolean sendToDeadLetterAddress(final Transaction tx,
final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
@ -3132,12 +3131,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
return true;
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED, null);
}
return false;
}
private void move(final Transaction originalTX,

View File

@ -150,7 +150,7 @@ public class RefsOperation extends TransactionOperationAbstract {
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
// if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -1292,7 +1293,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
}
@Override
@ -1375,10 +1377,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public boolean checkRedelivery(MessageReference ref,
long timeBase,
boolean ignoreRedeliveryDelay) throws Exception {
return false;
public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
long timeBase,
boolean ignoreRedeliveryDelay) throws Exception {
return new Pair<>(false, false);
}
@Override

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for broker side support of the Durable Subscription mapping for JMS.
*/
public class AmqpDLQReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateDurableReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
sendMessages(getQueueName(), 1);
Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
receiver.flow(100);
for (int i = 0; i < 10; i++) {
System.out.println("i = " + i);
AmqpMessage receive = receiver.receive(5000, TimeUnit.MILLISECONDS);
receive.modified(true, false);
Queue queueView = getProxyToQueue(getQueueName());
System.out.println("receive = " + receive.getWrappedMessage().getDeliveryCount());
System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
}
receiver.close();
connection.close();
Thread.sleep(5000);
Queue queueView = getProxyToQueue(getQueueName());
System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
Assert.assertEquals(0, queueView.getMessageCount());
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@ -172,8 +173,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
}
@Override
@ -380,11 +381,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean checkRedelivery(final MessageReference ref,
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference ref,
final long timeBase,
final boolean check) throws Exception {
// no-op
return false;
return new Pair<>(false, false);
}
@Override