From 39cd9d56f67ad167e60ffba4a14f82e05b43c169 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Sun, 6 Oct 2019 08:51:07 +0100 Subject: [PATCH] ARTEMIS-2497: [AMQP] Allow handling of the reject disposition to be configured. --- .../amqp/broker/ProtonProtocolManager.java | 11 + .../protocol/amqp/proton/AmqpSupport.java | 3 + .../proton/ProtonServerSenderContext.java | 11 +- .../proton/ProtonServerSenderContextTest.java | 6 + .../amqp/AmqpReceiverDispositionTest.java | 309 ++++++++++-------- 5 files changed, 210 insertions(+), 130 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 3a6f54c76b..121020c566 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -73,6 +73,10 @@ public class ProtonProtocolManager extends AbstractProtocolManager executeDelivery; + private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, @@ -129,6 +131,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr this.protonSession = protonSession; this.sessionSPI = server; this.executeDelivery = this::executeDelivery; + amqpTreatRejectAsUnmodifiedDeliveryFailed = this.connection.getProtocolManager() + .isAmqpTreatRejectAsUnmodifiedDeliveryFailed(); } public Object getBrokerConsumer() { @@ -681,7 +685,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr break; case Rejected: try { - sessionSPI.reject(brokerConsumer, message); + if (amqpTreatRejectAsUnmodifiedDeliveryFailed) { + // We could be more discriminating - for instance check for AmqpError#RESOURCE_LIMIT_EXCEEDED + sessionSPI.cancel(brokerConsumer, message, true); + } else { + sessionSPI.reject(brokerConsumer, message); + } } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java index 9b2980de94..a0bde916bf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java @@ -16,11 +16,14 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Sender; + import org.junit.Test; import java.util.Collections; @@ -34,8 +37,11 @@ public class ProtonServerSenderContextTest { @Test(expected = ActiveMQAMQPNotFoundException.class) public void testAcceptsNullSourceAddressWhenInitialising() throws Exception { + ProtonProtocolManager mock = mock(ProtonProtocolManager.class); + when(mock.getServer()).thenReturn(mock(ActiveMQServer.class)); Sender mockSender = mock(Sender.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); + when(mockConnContext.getProtocolManager()).thenReturn(mock); AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index e636d83695..7e17b973bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -25,149 +26,199 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.message.Message; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; /** * Test various behaviors of AMQP receivers with the broker. */ -public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { +@RunWith(Enclosed.class) +public class AmqpReceiverDispositionTest { - @Test(timeout = 30000) - public void testReleasedDisposition() throws Exception { - sendMessages(getQueueName(), 1); + public static class AmqpReceiverDispositionOrdinaryTests extends AmqpClientTestSupport { + @Test(timeout = 30000) + public void testReleasedDisposition() throws Exception { + sendMessages(getQueueName(), 1); - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - receiver1.flow(1); - - AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); - - AmqpReceiver receiver2 = session.createReceiver(getQueueName()); - - assertNotNull("did not receive message first time", message); - assertEquals("MessageID:0", message.getMessageId()); - - Message protonMessage = message.getWrappedMessage(); - assertNotNull(protonMessage); - assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); - - receiver2.flow(1); - message.release(); - - // Read the message again and validate its state - message = receiver2.receive(10, TimeUnit.SECONDS); - assertNotNull("did not receive message again", message); - assertEquals("MessageID:0", message.getMessageId()); - - message.accept(); - - protonMessage = message.getWrappedMessage(); - assertNotNull(protonMessage); - assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); - - connection.close(); - } - - @Test(timeout = 30000) - public void testRejectedDisposition() throws Exception { - sendMessages(getQueueName(), 1); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - receiver1.flow(1); - - AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); - assertNotNull("did not receive message first time", message); - assertEquals("MessageID:0", message.getMessageId()); - - Message protonMessage = message.getWrappedMessage(); - assertNotNull(protonMessage); - assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); - - message.reject(); - - // Reject is a terminal outcome and should not be redelivered to the rejecting receiver - // or any other as it should move to the archived state. - receiver1.flow(1); - message = receiver1.receiveNoWait(); - assertNull("Should not receive message again", message); - - // Attempt to Read the message again with another receiver to validate it is archived. - AmqpReceiver receiver2 = session.createReceiver(getQueueName()); - receiver2.flow(1); - assertNull(receiver2.receiveNoWait()); - - connection.close(); - } - - @Test(timeout = 30000) - public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { - doModifiedDispositionTestImpl(Boolean.TRUE, null); - } - - @Test(timeout = 30000) - public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { - doModifiedDispositionTestImpl(null, null); - } - - @Test(timeout = 30000) - public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { - doModifiedDispositionTestImpl(null, Boolean.TRUE); - } - - @Test(timeout = 30000) - public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { - doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE); - } - - private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { - sendMessages(getQueueName(), 1); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - receiver1.flow(1); - - AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); - assertNotNull("did not receive message first time", message); - - Message protonMessage = message.getWrappedMessage(); - assertNotNull(protonMessage); - assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); - - message.modified(deliveryFailed, undeliverableHere); - - // Remote must not redispatch to the client if undeliverable here is true - if (Boolean.TRUE.equals(undeliverableHere)) { + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); - message = receiver1.receive(1, TimeUnit.SECONDS); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); + + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + receiver2.flow(1); + message.release(); + + // Read the message again and validate its state + message = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + assertEquals("MessageID:0", message.getMessageId()); + + message.accept(); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + connection.close(); + } + + @Test(timeout = 30000) + public void testRejectedDisposition() throws Exception { + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.reject(); + + // Reject is a terminal outcome and should not be redelivered to the rejecting receiver + // or any other as it should move to the archived state. + receiver1.flow(1); + message = receiver1.receiveNoWait(); assertNull("Should not receive message again", message); + + // Attempt to Read the message again with another receiver to validate it is archived. + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); + receiver2.flow(1); + assertNull(receiver2.receiveNoWait()); + + connection.close(); } - AmqpReceiver receiver2 = session.createReceiver(getQueueName()); - receiver2.flow(1); - - message = receiver2.receive(5, TimeUnit.SECONDS); - assertNotNull("did not receive message again", message); - - int expectedDeliveryCount = 0; - if (Boolean.TRUE.equals(deliveryFailed)) { - expectedDeliveryCount = 1; + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, null); } - message.accept(); + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, null); + } - Message protonMessage2 = message.getWrappedMessage(); - assertNotNull(protonMessage2); - assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount()); + @Test(timeout = 30000) + public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(null, Boolean.TRUE); + } - connection.close(); + @Test(timeout = 30000) + public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception { + doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE); + } + + private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + message.modified(deliveryFailed, undeliverableHere); + + // Remote must not redispatch to the client if undeliverable here is true + if (Boolean.TRUE.equals(undeliverableHere)) { + receiver1.flow(1); + message = receiver1.receive(1, TimeUnit.SECONDS); + assertNull("Should not receive message again", message); + } + + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); + receiver2.flow(1); + + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message again", message); + + int expectedDeliveryCount = 0; + if (Boolean.TRUE.equals(deliveryFailed)) { + expectedDeliveryCount = 1; + } + + message.accept(); + + Message protonMessage2 = message.getWrappedMessage(); + assertNotNull(protonMessage2); + assertEquals("Unexpected updated value for AMQP delivery-count", + expectedDeliveryCount, + protonMessage2.getDeliveryCount()); + + connection.close(); + } + } + + public static class AmqpReceiverDispositionRejectAsUnmodifiedModeTests extends AmqpClientTestSupport { + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpTreatRejectAsUnmodifiedDeliveryFailed", true); + } + + @Test(timeout = 30000) + public void testRejectedDisposition() throws Exception { + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); + receiver1.flow(1); + + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message first time", message); + assertEquals("MessageID:0", message.getMessageId()); + + Message protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount()); + + // Owing to the config, the reject should be treat as if it were a + // Unmodified delivery-failed=true + message.reject(); + receiver1.flow(1); + + message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull("did not receive message after reject", message); + assertEquals("MessageID:0", message.getMessageId()); + + protonMessage = message.getWrappedMessage(); + assertNotNull(protonMessage); + assertEquals("Unexpected value for AMQP delivery-count after redelivery", 1, protonMessage.getDeliveryCount()); + + connection.close(); + } } }