From a8356fb057eb45f3a560e3841ce3fec220155153 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 21 Jun 2017 14:42:34 +0200 Subject: [PATCH] ARTEMIS-1246 Fixing compatibility issue It fixes compatibility issues with JMS Core clients using the old address model, allowing the client to query JMS temporary queues too. you would eventually see this issue when using older clients: AMQ119019: Queue already exists --- .../impl/wireformat/QueueAbstractPacket.java | 51 +++++++++++++++++++ .../core/ServerSessionPacketHandler.java | 16 +++--- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java index 57b72cd5ad..767cd0cfce 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -54,6 +58,53 @@ public abstract class QueueAbstractPacket extends PacketImpl { } } + /** + * It converts the given {@code queueNames} using the JMS prefix found on {@link #address} when {@code clientVersion < }{@link #ADDRESSING_CHANGE_VERSION}. + * If no conversion has occurred, it returns {@code queueNames}. + * + * @param clientVersion version of the client + * @param queueNames names of the queues to be converted + * @return the converted queues names or {@code queueNames} when no conversion has occurred + */ + public final List convertQueueNames(int clientVersion, List queueNames) { + if (clientVersion < ADDRESSING_CHANGE_VERSION) { + return applyAddressPrefixTo(queueNames); + } else { + return queueNames; + } + } + + private List applyAddressPrefixTo(List queueNames) { + final int names = queueNames.size(); + if (names == 0) { + return Collections.emptyList(); + } else { + final SimpleString address = this.address; + final SimpleString prefix = jmsPrefixOf(address); + if (prefix != null) { + final List prefixedQueueNames = new ArrayList<>(names); + for (int i = 0; i < names; i++) { + final SimpleString oldQueueNames = queueNames.get(i); + final SimpleString prefixedQueueName = prefix.concat(oldQueueNames); + prefixedQueueNames.add(prefixedQueueName); + } + return prefixedQueueNames; + } else { + return queueNames; + } + } + } + + private static SimpleString jmsPrefixOf(SimpleString address) { + if (address.startsWith(OLD_QUEUE_PREFIX)) { + return OLD_QUEUE_PREFIX; + } else if (address.startsWith(OLD_TOPIC_PREFIX)) { + return OLD_TOPIC_PREFIX; + } else { + return null; + } + } + public QueueAbstractPacket(byte type) { super(type); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 628312acef..8e3c3ed378 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -325,18 +324,21 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_BINDINGQUERY: { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; - BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); + final int clientVersion = remotingConnection.getClientVersion(); + BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion)); /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue * names otherwise the older client won't realize the queue exists and will try to create it and receive * an error */ - if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { - List queueNames = new ArrayList<>(); - for (SimpleString queueName : result.getQueueNames()) { - queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName)); + if (clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) { + final List queueNames = result.getQueueNames(); + if (!queueNames.isEmpty()) { + final List convertedQueueNames = request.convertQueueNames(clientVersion, queueNames); + if (convertedQueueNames != queueNames) { + result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + } } - result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {