This commit is contained in:
Clebert Suconic 2019-06-12 17:22:30 -04:00
commit e506180532
6 changed files with 114 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -281,7 +282,14 @@ public interface Queue extends Bindable,CriticalComponent {
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception; int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception; /**
*
* @param tx
* @param ref
* @return whether or not the message was actually sent to a DLA with bindings
* @throws Exception
*/
boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception; boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
@ -315,7 +323,16 @@ public interface Queue extends Bindable,CriticalComponent {
int getGroupCount(); int getGroupCount();
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception; /**
*
* @param ref
* @param timeBase
* @param ignoreRedeliveryDelay
* @return a Pair of Booleans: the first indicates whether or not redelivery happened; the second indicates whether
* or not the message was actually sent to a DLA with bindings
* @throws Exception
*/
Pair<Boolean, Boolean> checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/** /**
* It will iterate thorugh memory only (not paging) * It will iterate thorugh memory only (not paging)

View File

@ -1635,13 +1635,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override @Override
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception { public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
if (checkRedelivery(reference, timeBase, false)) { Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) { if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
internalAddHead(reference); internalAddHead(reference);
} }
resetAllIterators(); resetAllIterators();
} else { } else if (!redeliveryResult.getB()) {
decDelivering(reference); decDelivering(reference);
} }
} }
@ -2815,9 +2816,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync(); deliverAsync();
} }
} }
@Override @Override
public boolean checkRedelivery(final MessageReference reference, public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
final long timeBase, final long timeBase,
final boolean ignoreRedeliveryDelay) throws Exception { final boolean ignoreRedeliveryDelay) throws Exception {
Message message = reference.getMessage(); Message message = reference.getMessage();
@ -2827,7 +2827,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery"); logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
} }
// no DLQ check on internal queues // no DLQ check on internal queues
return true; return new Pair<>(true, false);
} }
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
@ -2845,9 +2845,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName()); logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
} }
sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
return false; return new Pair<>(false, dlaResult);
} else { } else {
// Second check Redelivery Delay // Second check Redelivery Delay
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) { if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
@ -2866,7 +2866,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
decDelivering(reference); decDelivering(reference);
return true; return new Pair<>(true, false);
} }
} }
@ -3114,13 +3114,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
} }
@Override @Override
public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
} }
private void sendToDeadLetterAddress(final Transaction tx, private boolean sendToDeadLetterAddress(final Transaction tx,
final MessageReference ref, final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception { final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) { if (deadLetterAddress != null) {
@ -3132,12 +3131,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else { } else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null); move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
return true;
} }
} else { } else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name); ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED, null); ref.acknowledge(tx, AckReason.KILLED, null);
} }
return false;
} }
private void move(final Transaction originalTX, private void move(final Transaction originalTX,

View File

@ -150,7 +150,7 @@ public class RefsOperation extends TransactionOperationAbstract {
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception { protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
// if ignore redelivery check, we just perform redelivery straight // if ignore redelivery check, we just perform redelivery straight
if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) { if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue()); LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) { if (toCancel == null) {

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -1292,7 +1293,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override @Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
} }
@Override @Override
@ -1375,10 +1377,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override @Override
public boolean checkRedelivery(MessageReference ref, public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
long timeBase, long timeBase,
boolean ignoreRedeliveryDelay) throws Exception { boolean ignoreRedeliveryDelay) throws Exception {
return false; return new Pair<>(false, false);
} }
@Override @Override

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for broker side support of the Durable Subscription mapping for JMS.
*/
public class AmqpDLQReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testCreateDurableReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
sendMessages(getQueueName(), 1);
Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
receiver.flow(100);
for (int i = 0; i < 10; i++) {
System.out.println("i = " + i);
AmqpMessage receive = receiver.receive(5000, TimeUnit.MILLISECONDS);
receive.modified(true, false);
Queue queueView = getProxyToQueue(getQueueName());
System.out.println("receive = " + receive.getWrappedMessage().getDeliveryCount());
System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
}
receiver.close();
connection.close();
Thread.sleep(5000);
Queue queueView = getProxyToQueue(getQueueName());
System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
Assert.assertEquals(0, queueView.getMessageCount());
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
@ -172,8 +173,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
} }
@Override @Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
} }
@Override @Override
@ -380,11 +381,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
} }
@Override @Override
public boolean checkRedelivery(final MessageReference ref, public Pair<Boolean, Boolean> checkRedelivery(final MessageReference ref,
final long timeBase, final long timeBase,
final boolean check) throws Exception { final boolean check) throws Exception {
// no-op // no-op
return false; return new Pair<>(false, false);
} }
@Override @Override