diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index bb7b3817f6..9771795177 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -52,6 +52,11 @@ public interface Channel { */ boolean supports(byte packetID); + /** + * For protocol check + */ + boolean supports(byte packetID, int version); + /** * Sends a packet on this channel. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index c03f76ceb8..c72e19bff6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -61,6 +61,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMe import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; @@ -290,9 +292,19 @@ public class ActiveMQSessionContext extends SessionContext { @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { - SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3); - - return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics()); + if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) { + Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3); + SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet; + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics()); + } else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) { + Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); + SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet; + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), false); + } else { + Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet; + return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index a51b7b9afe..41be0804c4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -152,8 +152,11 @@ public final class ChannelImpl implements Channel { @Override public boolean supports(final byte packetType) { - int version = connection.getClientVersion(); + return supports(packetType, connection.getClientVersion()); + } + @Override + public boolean supports(final byte packetType, int version) { switch (packetType) { case PacketImpl.CLUSTER_TOPOLOGY_V2: return version >= 122; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index 7247ede4f5..e4afb5be51 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -345,5 +345,10 @@ public class BackupSyncDelay implements Interceptor { return true; } + @Override + public boolean supports(byte packetID, int version) { + return true; + } + } }