ARTEMIS-1290 QueueQuery add prefix on address

This commit is contained in:
Martyn Taylor 2017-07-14 10:20:45 +01:00 committed by Justin Bertram
parent 5b8e781e36
commit 44506f2258
3 changed files with 40 additions and 3 deletions

View File

@ -138,4 +138,8 @@ public class QueueQueryResult {
public int getMaxConsumers() { public int getMaxConsumers() {
return maxConsumers; return maxConsumers;
} }
public void setAddress(SimpleString address) {
this.address = address;
}
} }

View File

@ -728,7 +728,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override @Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception { public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
return server.queueQuery(removePrefix(name)); QueueQueryResult result = server.queueQuery(removePrefix(name));
if (prefixEnabled) {
for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
if (entry.getValue() == result.getRoutingType()) {
result.setAddress(entry.getKey().concat(result.getAddress()));
break;
}
}
}
return result;
} }
@Override @Override

View File

@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -59,8 +62,11 @@ public class SessionTest extends ActiveMQTestBase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
locator = createInVMNonHALocator(); locator = createNettyNonHALocator();
server = createServer(false); Configuration configuration = createDefaultNettyConfig();
configuration.addAcceptorConfiguration("prefixed", "tcp://localhost:61617?multicastPrefix=multicast://;anycastPrefix=anycast://");
server = createServer(configuration);
createServer(false);
server.start(); server.start();
waitForServerToStart(server); waitForServerToStart(server);
} }
@ -206,6 +212,24 @@ public class SessionTest extends ActiveMQTestBase {
clientSession.close(); clientSession.close();
} }
@Test
public void testQueueQueryWithAddressPrefix() throws Exception {
String address = new String("testAddress");
cf = ActiveMQClient.createServerLocator("tcp://localhost:61617").createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(address, RoutingType.ANYCAST, queueName + "1", false);
clientSession.createQueue(address, RoutingType.MULTICAST, queueName + "2", false);
QueueQuery respA = clientSession.queueQuery(new SimpleString(queueName + "1"));
QueueQuery respM = clientSession.queueQuery(new SimpleString(queueName + "2"));
Assert.assertEquals(new SimpleString("anycast://" + address), respA.getAddress());
Assert.assertEquals(new SimpleString("multicast://" + address), respM.getAddress());
clientSession.close();
}
private void flushQueue() throws Exception { private void flushQueue() throws Exception {
Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName)); Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
assertNotNull(queue); assertNotNull(queue);