This closes #920

This commit is contained in:
Clebert Suconic 2016-12-14 21:10:25 -05:00
commit 88f4a8ccec
7 changed files with 110 additions and 30 deletions

View File

@ -239,7 +239,7 @@ public class AMQPSessionCallback implements SessionCallback {
BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
try {
serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
serverSession.createQueue(new SimpleString(address), new SimpleString(address), RoutingType.ANYCAST, null, false, true);
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}

View File

@ -251,9 +251,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
//if the client defines 1 routing type and the broker another then throw an exception
if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support");
throw new ActiveMQAMQPIllegalStateException("Address " + addressInfo.getName() + " is not configured for topic support");
} else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support");
throw new ActiveMQAMQPIllegalStateException("Address " + addressInfo.getName() + " is not configured for queue support");
}
} else {
//if not we look up the address

View File

@ -418,10 +418,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean browseOnly,
final boolean supportLargeMessage,
final Integer credits) throws Exception {
Binding binding = postOffice.getBinding(queueName);
final SimpleString unPrefixedQueueName = removePrefix(queueName);
Binding binding = postOffice.getBinding(unPrefixedQueueName);
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(unPrefixedQueueName);
}
SimpleString address = removePrefix(binding.getAddress());
@ -429,13 +431,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
try {
securityCheck(address, CheckType.BROWSE, this);
} catch (Exception e) {
securityCheck(address.concat(".").concat(queueName), CheckType.BROWSE, this);
securityCheck(address.concat(".").concat(unPrefixedQueueName), CheckType.BROWSE, this);
}
} else {
try {
securityCheck(address, CheckType.CONSUME, this);
} catch (Exception e) {
securityCheck(address.concat(".").concat(queueName), CheckType.CONSUME, this);
securityCheck(address.concat(".").concat(unPrefixedQueueName), CheckType.CONSUME, this);
}
}
@ -475,7 +477,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (logger.isDebugEnabled()) {
logger.debug("Session with user=" + username +
", connection=" + this.remotingConnection +
" created a consumer on queue " + queueName +
" created a consumer on queue " + unPrefixedQueueName +
", filter = " + filterString);
}
@ -528,6 +530,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final int maxConsumers,
final boolean deleteOnNoConsumers,
final boolean autoCreated) throws Exception {
final SimpleString unPrefixedName = removePrefix(name);
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
@ -540,7 +543,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
Queue queue = server.createQueue(art.getA(), art.getB(), unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@ -549,7 +552,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// session is closed.
// It is up to the user to delete the queue when finished with it
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, name);
TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(server, unPrefixedName);
if (remotingConnection instanceof TempQueueObserver) {
cleaner.setObserver((TempQueueObserver) remotingConnection);
}
@ -557,11 +560,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.addCloseListener(cleaner);
remotingConnection.addFailureListener(cleaner);
tempQueueCleannerUppers.put(name, cleaner);
tempQueueCleannerUppers.put(unPrefixedName, cleaner);
}
if (logger.isDebugEnabled()) {
logger.debug("Queue " + name + " created on address " + address +
logger.debug("Queue " + unPrefixedName + " created on address " + address +
" with filter=" + filterString + " temporary = " +
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
}
@ -692,15 +695,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void deleteQueue(final SimpleString queueToDelete) throws Exception {
Binding binding = postOffice.getBinding(queueToDelete);
final SimpleString unPrefixedQueueName = removePrefix(queueToDelete);
Binding binding = postOffice.getBinding(unPrefixedQueueName);
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
throw new ActiveMQNonExistentQueueException();
}
server.destroyQueue(queueToDelete, this, true);
server.destroyQueue(unPrefixedQueueName, this, true);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(queueToDelete);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);
if (cleaner != null) {
remotingConnection.removeCloseListener(cleaner);
@ -711,12 +716,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
return server.queueQuery(name);
return server.queueQuery(removePrefix(name));
}
@Override
public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception {
return server.addressQuery(name);
return server.addressQuery(removePrefix(name));
}
@Override

View File

@ -418,10 +418,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID), "invm"));
if (netty) {
configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY));
configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>()));
}
return configuration;

View File

@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
@ -105,6 +107,13 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
serverConfig.addAddressConfiguration(address);
serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
serverConfig.setSecurityEnabled(false);
Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty")) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
}
}
serverManager.start();
server.start();
return server;

View File

@ -16,11 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -30,7 +26,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -48,7 +47,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
/**
* Test basic send and receive scenarios using only AMQP sender and receiver links.
@ -177,6 +179,65 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testAnycastMessageRoutingExclusivity() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages("anycast://" + addressA, 1);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
}
@Test
public void testMulticastMessageRoutingExclusivity() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
sendMessages("multicast://" + addressA, 1);
assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
@Test
public void testAmbiguousMessageRouting() throws Exception {
final String addressA = "addressA";
final String queueA = "queueA";
final String queueB = "queueB";
final String queueC = "queueC";
final String queueD = "queueD";
ActiveMQServerControl serverControl = server.getActiveMQServerControl();
serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
sendMessages(addressA, 1);
assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
}
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;

View File

@ -24,11 +24,10 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Random;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@ -45,7 +44,13 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST));
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals("netty")) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
}
}
server.start();
}
@ -68,7 +73,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
try {
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue");
Queue queue = session.createQueue("anycast://exampleQueue");
MessageProducer sender = session.createProducer(queue);
String body = createMessage(10240);