From 329c533d21ac9eeb858ae89cfa616c6f63138bf5 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Wed, 26 Oct 2016 09:06:10 +0100 Subject: [PATCH] ARTEMIS-823 = broker doesn't settle rejected messages Currently we don't settle rejected messages at the broker, we should always settle when rejected https://issues.apache.org/jira/browse/ARTEMIS-823 --- .../amqp/broker/AMQPSessionCallback.java | 1 + .../proton/ProtonServerReceiverContext.java | 1 + .../amqp/client/AmqpAbstractResource.java | 5 + .../transport/amqp/client/AmqpSender.java | 11 +++ .../transport/amqp/client/AmqpValidator.java | 6 ++ .../integration/amqp/AmqpSecurityTest.java | 98 +++++++++++++++++++ 6 files changed, 122 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index acbb2e9d34..1fc85113ef 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -352,6 +352,7 @@ public class AMQPSessionCallback implements SessionCallback { Rejected rejected = new Rejected(); rejected.setError(ec); delivery.disposition(rejected); + delivery.settle(); connection.flush(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 41caea9926..0cc293ac37 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -158,6 +158,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements condition.setDescription(e.getMessage()); rejected.setError(condition); delivery.disposition(rejected); + delivery.settle(); } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index 50aa770212..0ab4596bbd 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client; import java.io.IOException; import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; import org.slf4j.Logger; @@ -303,6 +304,10 @@ public abstract class AmqpAbstractResource implements AmqpRe protected void doDetachedInspection() { } + protected void doDeliveryUpdate(Delivery delivery) { + + } + //----- Private implementation utility methods ---------------------------// private boolean isAwaitingOpen() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 3b134c9079..9b2a70d689 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -440,6 +440,8 @@ public class AmqpSender extends AmqpAbstractResource { tagGenerator.returnTag(delivery.getTag()); delivery.settle(); toRemove.add(delivery); + + doDeliveryUpdate(delivery); } pending.removeAll(toRemove); @@ -449,4 +451,13 @@ public class AmqpSender extends AmqpAbstractResource { public String toString() { return getClass().getSimpleName() + "{ address = " + address + "}"; } + + @Override + protected void doDeliveryUpdate(Delivery delivery) { + try { + getStateInspector().inspectDeliveryUpdate(delivery); + } catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java index 5f46cb6068..eca7676c75 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; @@ -70,6 +71,10 @@ public class AmqpValidator { } + public void inspectDeliveryUpdate(Delivery delivery) { + + } + public boolean isValid() { return valid; } @@ -98,4 +103,5 @@ public class AmqpValidator { throw new AssertionError(errorMessage); } } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java new file mode 100644 index 0000000000..2c15c35d20 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.engine.Delivery; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class AmqpSecurityTest extends AmqpClientTestSupport { + + @Override + protected ActiveMQServer createServer() throws Exception { + ActiveMQServer server = createServer(true, true); + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + securityManager.getConfiguration().addUser("foo", "bar"); + securityManager.getConfiguration().addRole("foo", "none"); + HierarchicalRepository> securityRepository = server.getSecurityRepository(); + HashSet value = new HashSet<>(); + value.add(new Role("none", false, true, true, true, true, true, true, true)); + securityRepository.addMatch(getTestName(), value); + + serverManager = new JMSServerManagerImpl(server); + Configuration serverConfig = server.getConfiguration(); + serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ"))); + serverConfig.setSecurityEnabled(true); + serverManager.start(); + server.start(); + return server; + } + + @Test(timeout = 60000) + public void testSendAndRejected() throws Exception { + AmqpConnection connection = null; + AmqpClient client = createAmqpClient("foo", "bar"); + CountDownLatch latch = new CountDownLatch(1); + client.setValidator(new AmqpValidator() { + @Override + public void inspectDeliveryUpdate(Delivery delivery) { + super.inspectDeliveryUpdate(delivery); + if (!delivery.remotelySettled()) { + markAsInvalid("delivery is not remotely settled"); + } + latch.countDown(); + } + }); + connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + AmqpMessage message = new AmqpMessage(); + + message.setMessageId("msg" + 1); + message.setMessageAnnotation("serialNo", 1); + message.setText("Test-Message"); + + try { + sender.send(message); + } catch (IOException e) { + // + } + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + connection.getStateInspector().assertValid(); + connection.close(); + } +}