This closes #1521 Amqpfqqn fixes

This commit is contained in:
Andy Taylor 2017-09-08 12:41:36 +01:00
commit 9c31055aba
2 changed files with 130 additions and 53 deletions

View File

@ -22,6 +22,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -73,9 +75,6 @@ import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
*/
@ -333,10 +332,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
supportedFilters.put(filter.getKey(), filter.getValue());
}
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
queue = matchingAnycastQueue.toString();
}
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
//if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) {
multicast = false;
@ -390,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue.toString();
} else {
@ -440,6 +437,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
if (queueName != null) {
QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else {
if (!result.getAddress().equals(address)) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
}
return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
}
}
return null;
}
protected String getClientId() {
return connection.getRemoteContainer();
}

View File

@ -68,22 +68,110 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
}
@Test(timeout = 60000)
//there isn't much use of FQQN for topics
//however we can test query functionality
public void testTopic() throws Exception {
@Test
public void testFQQNTopicWhenQueueDoesNotExist() throws Exception {
Exception e = null;
String queueName = "testQueue";
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(multicastAddress.toString() + "::" + queueName);
session.createConsumer(topic);
} catch (InvalidDestinationException ide) {
e = ide;
} finally {
connection.close();
}
assertNotNull(e);
assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
}
@Test
public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
// Create 2 Queues: address1::queue1, address2::queue2
String address1 = "a1";
String address2 = "a2";
String queue1 = "q1";
String queue2 = "q2";
server.createQueue(SimpleString.toSimpleString(address1), RoutingType.ANYCAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
server.createQueue(SimpleString.toSimpleString(address2), RoutingType.ANYCAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
Exception e = null;
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
String wrongFQQN = address1 + "::" + queue2;
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(wrongFQQN);
session.createConsumer(queue);
} catch (InvalidDestinationException ide) {
e = ide;
} finally {
connection.close();
}
assertNotNull(e);
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
}
@Test
public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
// Create 2 Queues: address1::queue1, address2::queue2
String address1 = "a1";
String address2 = "a2";
String queue1 = "q1";
String queue2 = "q2";
server.createQueue(SimpleString.toSimpleString(address1), RoutingType.MULTICAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
server.createQueue(SimpleString.toSimpleString(address2), RoutingType.MULTICAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
Exception e = null;
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
String wrongFQQN = address1 + "::" + queue2;
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(wrongFQQN);
session.createConsumer(topic);
} catch (InvalidDestinationException ide) {
e = ide;
} finally {
connection.close();
}
assertNotNull(e);
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
}
@Test(timeout = 60000)
//there isn't much use of FQQN for topics
//however we can test query functionality
public void testTopic() throws Exception {
SimpleString queueName = new SimpleString("someAddress");
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName, null, false, false);
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic fqqn = session.createTopic(multicastAddress.toString() + "::" + queueName);
MessageConsumer consumer1 = session.createConsumer(fqqn);
MessageConsumer consumer2 = session.createConsumer(fqqn);
Topic topic = session.createTopic(multicastAddress.toString());
MessageConsumer consumer1 = session.createConsumer(topic);
MessageConsumer consumer2 = session.createConsumer(topic);
MessageConsumer consumer3 = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
@ -91,10 +179,10 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
//each consumer receives one
Message m = consumer1.receive(2000);
assertNotNull(m);
// Subscribing to FQQN is akin to shared subscription
m = consumer2.receive(2000);
assertNotNull(m);
m = consumer3.receive(2000);
assertNotNull(m);
assertNull(m);
Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
for (Binding b : bindings.getBindings()) {
@ -171,11 +259,16 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
}
}
/**
* Broker should return exception if no address is passed in FQQN.
* @throws Exception
*/
@Test
public void testQueueSpecial() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
Connection connection = createConnection();
Exception expectedException = null;
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -183,39 +276,11 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
//::queue ok!
String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
javax.jms.Queue q1 = session.createQueue(specialName);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession coreSession = cf.createSession();
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
sendMessages(coreSession, coreProducer, 1);
System.out.println("create consumer: " + q1);
MessageConsumer consumer1 = session.createConsumer(q1);
assertNotNull(consumer1.receive(2000));
//queue::
specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString();
q1 = session.createQueue(specialName);
try {
session.createConsumer(q1);
fail("should get exception");
} catch (InvalidDestinationException e) {
//expected
}
//::
specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString();
q1 = session.createQueue(specialName);
try {
session.createConsumer(q1);
fail("should get exception");
} catch (InvalidDestinationException e) {
//expected
}
} finally {
connection.close();
session.createConsumer(q1);
} catch (InvalidDestinationException e) {
expectedException = e;
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("Queue: 'q1' does not exist for address ''"));
}
}