ARTEMIS-1290 QueueQuery add prefix on address 2
This commit is contained in:
parent
34df3afe0c
commit
e742de7d6e
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
|
||||||
|
@ -108,4 +109,12 @@ public abstract class QueueAbstractPacket extends PacketImpl {
|
||||||
public QueueAbstractPacket(byte type) {
|
public QueueAbstractPacket(byte type) {
|
||||||
super(type);
|
super(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SimpleString getOldPrefixedAddress(SimpleString address, RoutingType routingType) {
|
||||||
|
switch (routingType) {
|
||||||
|
case MULTICAST: return OLD_TOPIC_PREFIX.concat(address);
|
||||||
|
case ANYCAST: return OLD_QUEUE_PREFIX.concat(address);
|
||||||
|
default: return address;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,4 +138,8 @@ public class QueueQueryResult {
|
||||||
public int getMaxConsumers() {
|
public int getMaxConsumers() {
|
||||||
return maxConsumers;
|
return maxConsumers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAddress(SimpleString address) {
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,10 +92,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.actors.Actor;
|
|
||||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
|
||||||
import org.apache.activemq.artemis.utils.SimpleFuture;
|
import org.apache.activemq.artemis.utils.SimpleFuture;
|
||||||
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.Actor;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
|
||||||
|
@ -387,6 +387,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
requiresResponse = true;
|
requiresResponse = true;
|
||||||
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
|
||||||
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion()));
|
||||||
|
|
||||||
|
if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
|
||||||
|
result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
|
||||||
|
}
|
||||||
|
|
||||||
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
|
||||||
response = new SessionQueueQueryResponseMessage_V3(result);
|
response = new SessionQueueQueryResponseMessage_V3(result);
|
||||||
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
|
} else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
|
||||||
|
|
Loading…
Reference in New Issue