ARTEMIS-823 = broker doesn't settle rejected messages

Currently we don't settle rejected messages at the broker, we should always settle when rejected

https://issues.apache.org/jira/browse/ARTEMIS-823
This commit is contained in:
Andy Taylor 2016-10-26 09:06:10 +01:00 committed by Clebert Suconic
parent f509c081c1
commit 329c533d21
6 changed files with 122 additions and 0 deletions

View File

@ -352,6 +352,7 @@ public class AMQPSessionCallback implements SessionCallback {
Rejected rejected = new Rejected();
rejected.setError(ec);
delivery.disposition(rejected);
delivery.settle();
connection.flush();
}

View File

@ -158,6 +158,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
@ -303,6 +304,10 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
protected void doDetachedInspection() {
}
protected void doDeliveryUpdate(Delivery delivery) {
}
//----- Private implementation utility methods ---------------------------//
private boolean isAwaitingOpen() {

View File

@ -440,6 +440,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
tagGenerator.returnTag(delivery.getTag());
delivery.settle();
toRemove.add(delivery);
doDeliveryUpdate(delivery);
}
pending.removeAll(toRemove);
@ -449,4 +451,13 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public String toString() {
return getClass().getSimpleName() + "{ address = " + address + "}";
}
@Override
protected void doDeliveryUpdate(Delivery delivery) {
try {
getStateInspector().inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
@ -70,6 +71,10 @@ public class AmqpValidator {
}
public void inspectDeliveryUpdate(Delivery delivery) {
}
public boolean isValid() {
return valid;
}
@ -98,4 +103,5 @@ public class AmqpValidator {
throw new AssertionError(errorMessage);
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Delivery;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AmqpSecurityTest extends AmqpClientTestSupport {
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("foo", "bar");
securityManager.getConfiguration().addRole("foo", "none");
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true));
securityRepository.addMatch(getTestName(), value);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(true);
serverManager.start();
server.start();
return server;
}
@Test(timeout = 60000)
public void testSendAndRejected() throws Exception {
AmqpConnection connection = null;
AmqpClient client = createAmqpClient("foo", "bar");
CountDownLatch latch = new CountDownLatch(1);
client.setValidator(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Delivery delivery) {
super.inspectDeliveryUpdate(delivery);
if (!delivery.remotelySettled()) {
markAsInvalid("delivery is not remotely settled");
}
latch.countDown();
}
});
connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setText("Test-Message");
try {
sender.send(message);
} catch (IOException e) {
//
}
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
connection.close();
}
}