ARTEMIS-1178 backwards compatibility issues

This commit has 2 changes for backwards compatibility with older
clients:

1) A "bindings query" will now detect if the client is both JMS and
"old" (i.e. pre-2.0) and will prefix the returned queue names with the
old prefix (i.e. "jms.queue."). This will allow the old client to
properly detect whether or not a queue exists in its auto-creation
logic.

2) When messages are dispatched to a consumer there is logic to detect
if the consumer is both JMS and "old" and will prefix the "address"
on the message with "jms.queue." or "jms.topic." as appropriate
(if it's not already prefixed).
This commit is contained in:
Justin Bertram 2017-05-22 11:37:11 -05:00 committed by Clebert Suconic
parent 5a9830af17
commit b1ad8f3adf
2 changed files with 59 additions and 1 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -27,6 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -323,6 +326,19 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion()));
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
* names otherwise the older client won't realize the queue exists and will try to create it and receive
* an error
*/
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
List<SimpleString> queueNames = new ArrayList<>();
for (SimpleString queueName : result.getQueueNames()) {
queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName));
}
result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {

View File

@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
@ -42,6 +44,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -149,6 +153,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private long acks; private long acks;
private boolean requiresLegacyPrefix = false;
private boolean anycast = false;
// Constructors --------------------------------------------------------------------------------- // Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id, public ServerConsumerImpl(final long id,
@ -226,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
this.server = server; this.server = server;
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
requiresLegacyPrefix = true;
if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
anycast = true;
}
}
}
} }
@Override @Override
@ -535,6 +553,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName()); forcedDeliveryMessage.setAddress(messageQueue.getName());
applyPrefixForLegacyConsumer(forcedDeliveryMessage);
callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
}); });
@ -1053,7 +1072,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* @param ref * @param ref
* @param message * @param message
*/ */
private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
applyPrefixForLegacyConsumer(message);
int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
if (availableCredits != null) { if (availableCredits != null) {
@ -1068,6 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
private void applyPrefixForLegacyConsumer(Message message) {
/**
* check to see if:
* 1) This is a "core" connection
* 2) The "core" connection belongs to a JMS client
* 3) The JMS client is an "old" client which needs address prefixes
*
* If 1, 2, & 3 are true then apply the "old" prefix for queues and topics as appropriate.
*/
if (requiresLegacyPrefix) {
if (anycast) {
if (!message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress());
}
} else {
if (!message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress());
}
}
}
}
// Inner classes // Inner classes
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------