This commit is contained in:
Clebert Suconic 2017-09-28 21:40:19 -04:00
commit 3035a57e48
4 changed files with 58 additions and 13 deletions

View File

@ -21,6 +21,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.Message;
@ -276,15 +277,23 @@ public class AMQPSessionCallback implements SessionCallback {
return queueQueryResult;
}
public boolean bindingQuery(String address) throws Exception {
BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
SimpleString simpleAddress = SimpleString.toSimpleString(address);
BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
try {
serverSession.createQueue(new SimpleString(address), new SimpleString(address), RoutingType.ANYCAST, null, false, true);
serverSession.createAddress(simpleAddress, routingType, true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
} else if (routingType == RoutingType.ANYCAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
try {
serverSession.createQueue(simpleAddress, simpleAddress, routingType, null, false, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
}
return bindingQueryResult.isExists();
}
@ -406,7 +415,7 @@ public class AMQPSessionCallback implements SessionCallback {
return;
}
if (!bindingQuery(message.getAddress().toString())) {
if (!bindingQuery(message.getAddress().toString(), RoutingType.ANYCAST)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
}
@ -660,7 +669,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
public RoutingType getDefaultRoutingType(String address) {
return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType();
return manager.getServer().getAddressSettingsRepository().getMatch(address).getDefaultAddressRoutingType();
}
public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {

View File

@ -119,12 +119,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (address != null && !address.isEmpty()) {
try {
if (!sessionSPI.bindingQuery(address)) {
if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
} catch (Exception e) {
e.printStackTrace();
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
@ -177,11 +178,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
private RoutingType getRoutingType(Symbol[] symbols) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
return RoutingType.MULTICAST;
} else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
return RoutingType.ANYCAST;
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
return RoutingType.MULTICAST;
} else if (AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) || AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
return RoutingType.ANYCAST;
}
}
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -40,6 +42,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMultipleTransfers() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;

View File

@ -230,6 +230,36 @@ public class ConsumerTest extends ActiveMQTestBase {
internalSend(2, 1);
}
@Test
public void testAutoCreateMulticastAddress() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
ConnectionFactory factorySend = createFactory(2);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = session.createTopic("topic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
} finally {
connection.close();
}
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("topic")));
assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("topic")).getRoutingType());
assertEquals(0, server.getTotalMessageCount());
}
@Test
public void testSendCoreReceiveAMQP() throws Throwable {