diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index bf0a22d02e..2d7f373fde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; @@ -281,7 +282,14 @@ public interface Queue extends Bindable,CriticalComponent { int sendMessagesToDeadLetterAddress(Filter filter) throws Exception; - void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception; + /** + * + * @param tx + * @param ref + * @return whether or not the message was actually sent to a DLA with bindings + * @throws Exception + */ + boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception; boolean changeReferencePriority(long messageID, byte newPriority) throws Exception; @@ -315,7 +323,16 @@ public interface Queue extends Bindable,CriticalComponent { int getGroupCount(); - boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception; + /** + * + * @param ref + * @param timeBase + * @param ignoreRedeliveryDelay + * @return a Pair of Booleans: the first indicates whether or not redelivery happened; the second indicates whether + * or not the message was actually sent to a DLA with bindings + * @throws Exception + */ + Pair checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception; /** * It will iterate thorugh memory only (not paging) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a772529dd3..44b55e0c97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1635,13 +1635,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception { - if (checkRedelivery(reference, timeBase, false)) { + Pair redeliveryResult = checkRedelivery(reference, timeBase, false); + if (redeliveryResult.getA()) { if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) { internalAddHead(reference); } resetAllIterators(); - } else { + } else if (!redeliveryResult.getB()) { decDelivering(reference); } } @@ -2815,9 +2816,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { deliverAsync(); } } - @Override - public boolean checkRedelivery(final MessageReference reference, + public Pair checkRedelivery(final MessageReference reference, final long timeBase, final boolean ignoreRedeliveryDelay) throws Exception { Message message = reference.getMessage(); @@ -2827,7 +2827,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery"); } // no DLQ check on internal queues - return true; + return new Pair<>(true, false); } if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) { @@ -2845,9 +2845,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName()); } - sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); + boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); - return false; + return new Pair<>(false, dlaResult); } else { // Second check Redelivery Delay if (!ignoreRedeliveryDelay && redeliveryDelay > 0) { @@ -2866,7 +2866,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { decDelivering(reference); - return true; + return new Pair<>(true, false); } } @@ -3114,13 +3114,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - @Override - public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { - sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); + public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception { + return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); } - private void sendToDeadLetterAddress(final Transaction tx, + private boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref, final SimpleString deadLetterAddress) throws Exception { if (deadLetterAddress != null) { @@ -3132,12 +3131,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null); + return true; } } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name); ref.acknowledge(tx, AckReason.KILLED, null); } + + return false; } private void move(final Transaction originalTX, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index de52cc4bfb..c8d929730e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -150,7 +150,7 @@ public class RefsOperation extends TransactionOperationAbstract { protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map> queueMap) throws Exception { // if ignore redelivery check, we just perform redelivery straight - if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) { + if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) { LinkedList toCancel = queueMap.get(ref.getQueue()); if (toCancel == null) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 6a03c02161..ad4cbb3169 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -1292,7 +1293,8 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { + public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { + return false; } @Override @@ -1375,10 +1377,10 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public boolean checkRedelivery(MessageReference ref, - long timeBase, - boolean ignoreRedeliveryDelay) throws Exception { - return false; + public Pair checkRedelivery(MessageReference ref, + long timeBase, + boolean ignoreRedeliveryDelay) throws Exception { + return new Pair<>(false, false); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java new file mode 100644 index 0000000000..a95c9084a9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for broker side support of the Durable Subscription mapping for JMS. + */ +public class AmqpDLQReceiverTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCreateDurableReceiver() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + sendMessages(getQueueName(), 1); + Queue queue = getProxyToQueue(getQueueName()); + assertNotNull(queue); + receiver.flow(100); + for (int i = 0; i < 10; i++) { + System.out.println("i = " + i); + AmqpMessage receive = receiver.receive(5000, TimeUnit.MILLISECONDS); + receive.modified(true, false); + Queue queueView = getProxyToQueue(getQueueName()); + System.out.println("receive = " + receive.getWrappedMessage().getDeliveryCount()); + System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount()); + System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount()); + System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize()); + } + + receiver.close(); + connection.close(); + Thread.sleep(5000); + Queue queueView = getProxyToQueue(getQueueName()); + System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount()); + System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount()); + System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize()); + Assert.assertEquals(0, queueView.getMessageCount()); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 2944fd46d1..290aa15e62 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; @@ -172,8 +173,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { - + public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception { + return false; } @Override @@ -380,11 +381,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public boolean checkRedelivery(final MessageReference ref, + public Pair checkRedelivery(final MessageReference ref, final long timeBase, final boolean check) throws Exception { // no-op - return false; + return new Pair<>(false, false); } @Override