From b1ad8f3adf51fa96069e1eccd7b87e9dff85223d Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 22 May 2017 11:37:11 -0500 Subject: [PATCH] ARTEMIS-1178 backwards compatibility issues This commit has 2 changes for backwards compatibility with older clients: 1) A "bindings query" will now detect if the client is both JMS and "old" (i.e. pre-2.0) and will prefix the returned queue names with the old prefix (i.e. "jms.queue."). This will allow the old client to properly detect whether or not a queue exists in its auto-creation logic. 2) When messages are dispatched to a consumer there is logic to detect if the consumer is both JMS and "old" and will prefix the "address" on the message with "jms.queue." or "jms.topic." as appropriate (if it's not already prefixed). --- .../core/ServerSessionPacketHandler.java | 16 +++++++ .../core/server/impl/ServerConsumerImpl.java | 44 ++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 385376e123..628312acef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -27,6 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -323,6 +326,19 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); + + /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue + * names otherwise the older client won't realize the queue exists and will try to create it and receive + * an error + */ + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + List queueNames = new ArrayList<>(); + for (SimpleString queueName : result.getQueueNames()) { + queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName)); + } + result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + } + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b18bb263c3..4ace4d4d40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; @@ -42,6 +44,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -149,6 +153,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private long acks; + private boolean requiresLegacyPrefix = false; + + private boolean anycast = false; + // Constructors --------------------------------------------------------------------------------- public ServerConsumerImpl(final long id, @@ -226,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } this.server = server; + + if (session.getRemotingConnection() instanceof CoreRemotingConnection) { + CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection(); + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + requiresLegacyPrefix = true; + if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) { + anycast = true; + } + } + } } @Override @@ -535,6 +553,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); + applyPrefixForLegacyConsumer(forcedDeliveryMessage); callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); }); @@ -1053,7 +1072,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param ref * @param message */ - private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { + private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException { + applyPrefixForLegacyConsumer(message); int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { @@ -1068,6 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void applyPrefixForLegacyConsumer(Message message) { + /** + * check to see if: + * 1) This is a "core" connection + * 2) The "core" connection belongs to a JMS client + * 3) The JMS client is an "old" client which needs address prefixes + * + * If 1, 2, & 3 are true then apply the "old" prefix for queues and topics as appropriate. + */ + if (requiresLegacyPrefix) { + if (anycast) { + if (!message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress()); + } + } else { + if (!message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress()); + } + } + } + } + // Inner classes // ------------------------------------------------------------------------