This closes #849

This commit is contained in:
Clebert Suconic 2016-10-19 13:22:56 -04:00
commit 9899f5243f
4 changed files with 29 additions and 4 deletions

View File

@ -52,6 +52,11 @@ public interface Channel {
*/
boolean supports(byte packetID);
/**
* For protocol check
*/
boolean supports(byte packetID, int version);
/**
* Sends a packet on this channel.
*

View File

@ -61,6 +61,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMe
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@ -290,9 +292,19 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics());
if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics());
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), false);
} else {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false);
}
}
@Override

View File

@ -152,8 +152,11 @@ public final class ChannelImpl implements Channel {
@Override
public boolean supports(final byte packetType) {
int version = connection.getClientVersion();
return supports(packetType, connection.getClientVersion());
}
@Override
public boolean supports(final byte packetType, int version) {
switch (packetType) {
case PacketImpl.CLUSTER_TOPOLOGY_V2:
return version >= 122;

View File

@ -345,5 +345,10 @@ public class BackupSyncDelay implements Interceptor {
return true;
}
@Override
public boolean supports(byte packetID, int version) {
return true;
}
}
}