This commit is contained in:
Clebert Suconic 2017-07-18 16:16:42 -04:00
commit 00d880eb03
4 changed files with 19 additions and 38 deletions

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -108,4 +109,12 @@ public abstract class QueueAbstractPacket extends PacketImpl {
public QueueAbstractPacket(byte type) {
super(type);
}
public static SimpleString getOldPrefixedAddress(SimpleString address, RoutingType routingType) {
switch (routingType) {
case MULTICAST: return OLD_TOPIC_PREFIX.concat(address);
case ANYCAST: return OLD_QUEUE_PREFIX.concat(address);
default: return address;
}
}
}

View File

@ -92,10 +92,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@ -387,6 +387,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
}
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
response = new SessionQueueQueryResponseMessage_V3(result);
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {

View File

@ -728,16 +728,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
QueueQueryResult result = server.queueQuery(removePrefix(name));
if (prefixEnabled) {
for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
if (entry.getValue() == result.getRoutingType()) {
result.setAddress(entry.getKey().concat(result.getAddress()));
break;
}
}
}
return result;
return server.queueQuery(removePrefix(name));
}
@Override

View File

@ -21,9 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -34,7 +32,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -62,11 +59,8 @@ public class SessionTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
locator = createNettyNonHALocator();
Configuration configuration = createDefaultNettyConfig();
configuration.addAcceptorConfiguration("prefixed", "tcp://localhost:61617?multicastPrefix=multicast://;anycastPrefix=anycast://");
server = createServer(configuration);
createServer(false);
locator = createInVMNonHALocator();
server = createServer(false);
server.start();
waitForServerToStart(server);
}
@ -212,24 +206,6 @@ public class SessionTest extends ActiveMQTestBase {
clientSession.close();
}
@Test
public void testQueueQueryWithAddressPrefix() throws Exception {
String address = new String("testAddress");
cf = ActiveMQClient.createServerLocator("tcp://localhost:61617").createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(address, RoutingType.ANYCAST, queueName + "1", false);
clientSession.createQueue(address, RoutingType.MULTICAST, queueName + "2", false);
QueueQuery respA = clientSession.queueQuery(new SimpleString(queueName + "1"));
QueueQuery respM = clientSession.queueQuery(new SimpleString(queueName + "2"));
Assert.assertEquals(new SimpleString("anycast://" + address), respA.getAddress());
Assert.assertEquals(new SimpleString("multicast://" + address), respM.getAddress());
clientSession.close();
}
private void flushQueue() throws Exception {
Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
assertNotNull(queue);