ARTEMIS-1576 anon AMQP producer creates address w/wrong routing-type

This commit is contained in:
Justin Bertram 2018-01-03 10:03:50 -06:00 committed by Michael Pearce
parent 3677cd2c8d
commit 14d6c30852
3 changed files with 37 additions and 17 deletions

View File

@ -454,7 +454,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
//here check queue-autocreation
RoutingType routingType = context.getRoutingType(receiver, RoutingType.ANYCAST);
RoutingType routingType = context.getRoutingType(receiver, address);
if (!bindingQuery(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}

View File

@ -100,10 +100,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (target != null) {
if (target.getDynamic()) {
defRoutingType = getRoutingType(target.getCapabilities());
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
address = sessionSPI.tempQueueName();
defRoutingType = getRoutingType(target.getCapabilities(), address);
try {
sessionSPI.createTemporaryQueue(address, defRoutingType);
@ -121,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
address = target.getAddress();
if (address != null && !address.isEmpty()) {
defRoutingType = getRoutingType(target.getCapabilities());
defRoutingType = getRoutingType(target.getCapabilities(), address);
try {
if (!sessionSPI.bindingQuery(address, defRoutingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
@ -181,16 +181,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh);
}
public RoutingType getRoutingType(Receiver receiver, RoutingType defaultType) {
public RoutingType getRoutingType(Receiver receiver, String address) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities(), defaultType) : getRoutingType((Symbol[])null, defaultType);
return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address);
}
private RoutingType getRoutingType(Symbol[] symbols) {
return getRoutingType(symbols, null);
}
private RoutingType getRoutingType(Symbol[] symbols, RoutingType defaultType) {
private RoutingType getRoutingType(Symbol[] symbols, String address) {
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@ -201,11 +197,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
if (defaultType != null) {
return defaultType;
} else {
return sessionSPI.getDefaultRoutingType(address);
}
return sessionSPI.getDefaultRoutingType(address);
}
/*

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
@ -27,6 +25,9 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Random;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
@ -67,6 +68,33 @@ public class JMSMessageProducerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 30000)
public void testAnonymousProducerWithAutoCreation() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(UUID.randomUUID().toString());
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
message.setText("hello");
// this will auto-create the address
p.send(topic, message);
{
MessageConsumer consumer = session.createConsumer(topic);
p.send(topic, message);
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
consumer.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testAnonymousProducerAcrossManyDestinations() throws Exception {
Connection connection = createConnection();