ARTEMIS-562 Use 'to' field if sender target is null

This commit is contained in:
Howard Gao 2017-01-11 08:53:40 +08:00 committed by Clebert Suconic
parent 9b0447be83
commit 4e309d842e
6 changed files with 227 additions and 25 deletions
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp
tests
artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client
integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessag
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@ -59,6 +60,7 @@ import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@ -356,6 +358,16 @@ public class AMQPSessionCallback implements SessionCallback {
//use the address on the receiver if not null, if null let's hope it was set correctly on the message
if (address != null) {
message.setAddress(new SimpleString(address));
} else {
// Anonymous relay must set a To value
if (message.getAddress() == null) {
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}
if (!bindingQuery(message.getAddress().toString())) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
}
recoverContext();
@ -370,18 +382,19 @@ public class AMQPSessionCallback implements SessionCallback {
transaction.markAsRollbackOnly(e);
}
} else {
rejectMessage(delivery);
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
}
} else {
serverSend(transaction, message, delivery, receiver);
}
}
private void rejectMessage(Delivery delivery) {
String address = delivery.getLink().getTarget().getAddress();
ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
private void rejectMessage(Delivery delivery, Symbol errorCondition, String errorMessage) {
ErrorCondition condition = new ErrorCondition();
condition.setCondition(errorCondition);
condition.setDescription(errorMessage);
Rejected rejected = new Rejected();
rejected.setError(ec);
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
connection.flush();
@ -429,6 +442,11 @@ public class AMQPSessionCallback implements SessionCallback {
final int threshold,
final Receiver receiver) {
try {
if (address == null) {
receiver.flow(credits);
connection.flush();
return;
}
final PagingStore store = manager.getServer().getPagingManager().getPageStore(new SimpleString(address));
store.checkMemory(new Runnable() {
@Override

View File

@ -84,7 +84,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (target != null) {
if (target.getDynamic()) {
//if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
address = sessionSPI.tempQueueName();
@ -96,23 +96,24 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address);
} else {
//if not dynamic then we use the targets address as the address to forward the messages to, however there has to
//be a queue bound to it so we nee to check this.
// the target will have an address unless the remote is requesting an anonymous
// relay in which case the address in the incoming message's to field will be
// matched on receive of the message.
address = target.getAddress();
if (address == null) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
}
try {
if (!sessionSPI.bindingQuery(address)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
if (address != null && !address.isEmpty()) {
try {
if (!sessionSPI.bindingQuery(address)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}
Symbol[] remoteDesiredCapabilities = receiver.getRemoteDesiredCapabilities();
if (remoteDesiredCapabilities != null) {
List<Symbol> list = Arrays.asList(remoteDesiredCapabilities);

View File

@ -22,7 +22,10 @@ import org.apache.qpid.proton.engine.Connection;
public class ExtCapability {
public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS};
public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY,
AmqpSupport.DELAYED_DELIVERY,
AmqpSupport.SHARED_SUBS,
AmqpSupport.ANONYMOUS_RELAY};
public static Symbol[] getCapabilities() {
return capabilities;

View File

@ -90,6 +90,16 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return createSender(null, false);
}
/**
* Create an anonymous sender instance using the anonymous relay support of the broker.
*
* @return a newly created sender that is ready for use.
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createAnonymousSender() throws Exception {
return createSender(null, false);
}
/**
* Create a sender instance using the given address
*

View File

@ -26,6 +26,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
@ -41,12 +43,17 @@ import org.junit.Test;
public class AmqpSecurityTest extends AmqpClientTestSupport {
private String user1 = "user1";
private String password1 = "password1";
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = createServer(true, true);
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("foo", "bar");
securityManager.getConfiguration().addRole("foo", "none");
securityManager.getConfiguration().addUser(user1, password1);
securityManager.getConfiguration().addRole(user1, "none");
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true));
@ -144,4 +151,35 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(getTestName());
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
}

View File

@ -64,6 +64,7 @@ import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
@ -83,6 +84,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverCont
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -110,10 +112,6 @@ public class ProtonTest extends ProtonTestBase {
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
private static final String userName = "guest";
private static final String password = "guest";
private static final String brokerName = "my-broker";
private static final long maxSizeBytes = 1 * 1024 * 1024;
@ -153,6 +151,17 @@ public class ProtonTest extends ProtonTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
Configuration serverConfig = server.getConfiguration();
Map<String, AddressSettings> settings = serverConfig.getAddressesSettings();
assertNotNull(settings);
AddressSettings addressSetting = settings.get("#");
if (addressSetting == null) {
addressSetting = new AddressSettings();
settings.put("#", addressSetting);
}
addressSetting.setAutoCreateQueues(false);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
@ -463,7 +472,8 @@ public class ProtonTest extends ProtonTestBase {
session.commit();
session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 10);
//because tx commit is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
}
@Test
@ -538,7 +548,9 @@ public class ProtonTest extends ProtonTestBase {
}
session.rollback();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
Assert.assertEquals(q.getMessageCount(), 10);
//because tx rollback is executed async on broker, we use a timed wait.
assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
}
@Test
@ -1648,6 +1660,126 @@ public class ProtonTest extends ProtonTestBase {
connection.close();
}
@Test
public void testProducerWithoutUsingDefaultDestination() throws Exception {
try {
javax.jms.Queue queue = createQueue(coreAddress);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(null);
for (int i = 1; i <= 10; i++) {
String targetName = coreAddress + i;
javax.jms.Queue target = createQueue(targetName);
TextMessage message = session.createTextMessage("message for " + targetName);
p.send(target, message);
}
connection.start();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message m = messageConsumer.receive(200);
Assert.assertNull(m);
for (int i = 1; i <= 10; i++) {
String targetName = coreAddress + i;
javax.jms.Queue target = createQueue(targetName);
MessageConsumer consumer = session.createConsumer(target);
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
assertEquals("message for " + targetName, tm.getText());
consumer.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address);
message.setMessageId("msg" + 1);
message.setText("Test-Message");
sender.send(message);
sender.close();
AmqpReceiver receiver = session.createReceiver(address);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals("msg1", received.getMessageId());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(address + "-not-in-service");
message.setMessageId("msg" + 1);
message.setText("Test-Message");
try {
sender.send(message);
fail("Should not be able to send, message should be rejected");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
sender.close();
}
} finally {
connection.close();
}
}
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {