This closes #866

This commit is contained in:
Clebert Suconic 2016-11-21 11:24:51 -05:00
commit 2f7053382e
8 changed files with 147 additions and 2 deletions

View File

@ -352,6 +352,7 @@ public class AMQPSessionCallback implements SessionCallback {
Rejected rejected = new Rejected();
rejected.setError(ec);
delivery.disposition(rejected);
delivery.settle();
connection.flush();
}

View File

@ -158,6 +158,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
}
}

View File

@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -140,8 +141,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
try {
SelectorParser.parse(selector);
} catch (FilterException e) {
close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
}
supportedFilters.put(filter.getKey(), filter.getValue());
@ -313,6 +313,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
closed = true;
if (condition != null) {
sender.setCondition(condition);
}
protonSession.removeSender(sender);
synchronized (connection.getLock()) {
sender.close();

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
@ -303,6 +304,10 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
protected void doDetachedInspection() {
}
protected void doDeliveryUpdate(Delivery delivery) {
}
//----- Private implementation utility methods ---------------------------//
private boolean isAwaitingOpen() {

View File

@ -440,6 +440,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
tagGenerator.returnTag(delivery.getTag());
delivery.settle();
toRemove.add(delivery);
doDeliveryUpdate(delivery);
}
pending.removeAll(toRemove);
@ -449,4 +451,13 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public String toString() {
return getClass().getSimpleName() + "{ address = " + address + "}";
}
@Override
protected void doDeliveryUpdate(Delivery delivery) {
try {
getStateInspector().inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
@ -70,6 +71,10 @@ public class AmqpValidator {
}
public void inspectDeliveryUpdate(Delivery delivery) {
}
public boolean isValid() {
return valid;
}
@ -98,4 +103,5 @@ public class AmqpValidator {
throw new AssertionError(errorMessage);
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Delivery;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AmqpSecurityTest extends AmqpClientTestSupport {
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("foo", "bar");
securityManager.getConfiguration().addRole("foo", "none");
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true));
securityRepository.addMatch(getTestName(), value);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(true);
serverManager.start();
server.start();
return server;
}
@Test(timeout = 60000)
public void testSendAndRejected() throws Exception {
AmqpConnection connection = null;
AmqpClient client = createAmqpClient("foo", "bar");
CountDownLatch latch = new CountDownLatch(1);
client.setValidator(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Delivery delivery) {
super.inspectDeliveryUpdate(delivery);
if (!delivery.remotelySettled()) {
markAsInvalid("delivery is not remotely settled");
}
latch.countDown();
}
});
connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setText("Test-Message");
try {
sender.send(message);
} catch (IOException e) {
//
}
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
connection.close();
}
}

View File

@ -45,6 +45,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
*/
@ -132,6 +134,24 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testInvalidFilter() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
try {
session.createReceiver(getTestName(), "null = 'f''", true);
fail("should throw exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof JMSException);
//passed
}
connection.close();
}
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
sendMessages(getTestName(), 1);