diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 96cde97edd..646eb28602 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -24,6 +25,11 @@ import org.apache.activemq.artemis.utils.DataConstants; public class PacketImpl implements Packet { // Constants ------------------------------------------------------------------------- + public static final int ADDRESSING_CHANGE_VERSION = 129; + + public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); + public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic."); + // The minimal size for all the packets, Common data for all the packets (look at // PacketImpl.encode) public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + @@ -267,6 +273,20 @@ public class PacketImpl implements Packet { // Public -------------------------------------------------------- + public SimpleString convertName(SimpleString name) { + if (name == null) { + return null; + } + + if (name.startsWith(OLD_QUEUE_PREFIX)) { + return name.subSeq(OLD_QUEUE_PREFIX.length(), name.length()); + } else if (name.startsWith(OLD_TOPIC_PREFIX)) { + return name.subSeq(OLD_TOPIC_PREFIX.length(), name.length()); + } else { + return name; + } + } + @Override public byte getType() { return type; @@ -376,4 +396,6 @@ public class PacketImpl implements Packet { protected int nullableStringEncodeSize(final String str) { return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0); } + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java new file mode 100644 index 0000000000..57b72cd5ad --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; + +public abstract class QueueAbstractPacket extends PacketImpl { + + protected SimpleString queueName; + protected SimpleString oldVersionQueueName; + + protected SimpleString address; + protected SimpleString oldVersionAddresseName; + + public SimpleString getQueueName(int clientVersion) { + + if (clientVersion < ADDRESSING_CHANGE_VERSION) { + if (oldVersionQueueName == null) { + oldVersionQueueName = convertName(queueName); + } + + return oldVersionQueueName; + } else { + return queueName; + } + } + + public SimpleString getAddress(int clientVersion) { + + if (clientVersion < ADDRESSING_CHANGE_VERSION) { + if (oldVersionAddresseName == null) { + oldVersionAddresseName = convertName(address); + } + + return oldVersionAddresseName; + } else { + return address; + } + } + + public QueueAbstractPacket(byte type) { + super(type); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java index 0bb06e2f75..d33b884935 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionBindingQueryMessage.java @@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionBindingQueryMessage extends PacketImpl { - - private SimpleString address; +public class SessionBindingQueryMessage extends QueueAbstractPacket { public SessionBindingQueryMessage(final SimpleString address) { super(SESS_BINDINGQUERY); @@ -34,10 +31,6 @@ public class SessionBindingQueryMessage extends PacketImpl { super(SESS_BINDINGQUERY); } - public SimpleString getAddress() { - return address; - } - @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(address); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java index afff162a0f..f09beeb25b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java @@ -18,14 +18,11 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionCreateConsumerMessage extends PacketImpl { +public class SessionCreateConsumerMessage extends QueueAbstractPacket { private long id; - private SimpleString queueName; - private SimpleString filterString; private boolean browseOnly; @@ -66,10 +63,6 @@ public class SessionCreateConsumerMessage extends PacketImpl { return id; } - public SimpleString getQueueName() { - return queueName; - } - public SimpleString getFilterString() { return filterString; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java index 172f29fc80..1b8c526cf6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryMessage.java @@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -public class SessionQueueQueryMessage extends PacketImpl { - - private SimpleString queueName; +public class SessionQueueQueryMessage extends QueueAbstractPacket { public SessionQueueQueryMessage(final SimpleString queueName) { super(SESS_QUEUEQUERY); @@ -34,10 +31,6 @@ public class SessionQueueQueryMessage extends PacketImpl { super(SESS_QUEUEQUERY); } - public SimpleString getQueueName() { - return queueName; - } - @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(queueName); diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java new file mode 100644 index 0000000000..e75c18485c --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/QueueAbstractTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QueueAbstractPacket; +import org.junit.Assert; +import org.junit.Test; + +public class QueueAbstractTest { + + class MyTest extends QueueAbstractPacket { + + MyTest(String name) { + super((byte)0); + this.queueName = SimpleString.toSimpleString(name); + } + } + + + @Test + public void testOldTopic() { + MyTest test = new MyTest("jms.topic.mytopic"); + + Assert.assertEquals("mytopic", test.getQueueName(127).toString()); + } + + @Test + public void testOldQueue() { + MyTest test = new MyTest("jms.queue.myQueue"); + + Assert.assertEquals("myQueue", test.getQueueName(127).toString()); + } +} diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 024b8a2226..4b898171fe 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -367,22 +367,29 @@ public class ActiveMQSession implements QueueSession, TopicSession { } try { - ActiveMQQueue queue = lookupQueue(queueName, false); - - if (queue == null) { - queue = lookupQueue(queueName, true); - } - - if (queue == null) { - throw new JMSException("There is no queue with name " + queueName); - } else { - return queue; - } + return internalCreateQueue(queueName, false); } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } } + protected Queue internalCreateQueue(String queueName, final boolean retry) throws ActiveMQException, JMSException { + ActiveMQQueue queue = lookupQueue(queueName, false); + + if (queue == null) { + queue = lookupQueue(queueName, true); + } + + if (queue == null) { + if (!retry) { + return internalCreateQueue("jms.queue." + queueName, true); + } + throw new JMSException("There is no queue with name " + queueName); + } else { + return queue; + } + } + @Override public Topic createTopic(final String topicName) throws JMSException { // As per spec. section 4.11 @@ -391,22 +398,29 @@ public class ActiveMQSession implements QueueSession, TopicSession { } try { - ActiveMQTopic topic = lookupTopic(topicName, false); - - if (topic == null) { - topic = lookupTopic(topicName, true); - } - - if (topic == null) { - throw new JMSException("There is no topic with name " + topicName); - } else { - return topic; - } + return internalCreateTopic(topicName, false); } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } } + protected Topic internalCreateTopic(String topicName, boolean retry) throws ActiveMQException, JMSException { + ActiveMQTopic topic = lookupTopic(topicName, false); + + if (topic == null) { + topic = lookupTopic(topicName, true); + } + + if (topic == null) { + if (!retry) { + return internalCreateTopic("jms.topic." + topicName, true); + } + throw new JMSException("There is no topic with name " + topicName); + } else { + return topic; + } + } + @Override public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException { return createDurableSubscriber(topic, name, null, false); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index acb1c4c7a9..be2f04d092 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -150,7 +150,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { this.remotingConnection = channel.getConnection(); - //TODO think of a better way of doing this Connection conn = remotingConnection.getTransportConnection(); if (conn instanceof NettyConnection) { @@ -215,11 +214,12 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_CREATECONSUMER: { SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly()); + session.createConsumer(request.getID(), request.getQueueName(remotingConnection.getClientVersion()), request.getFilterString(), request.isBrowseOnly()); if (requiresResponse) { // We send back queue information on the queue as a response- this allows the queue to // be automatically recreated on failover - QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName()); + QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion())); + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { response = new SessionQueueQueryResponseMessage_V3(queueQueryResult); } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { @@ -287,7 +287,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_QUEUEQUERY: { requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; - QueueQueryResult result = session.executeQueueQuery(request.getQueueName()); + QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion())); if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { response = new SessionQueueQueryResponseMessage_V3(result); } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) { @@ -300,7 +300,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_BINDINGQUERY: { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; - BindingQueryResult result = session.executeBindingQuery(request.getAddress()); + BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultDeleteOnNoConsumers(), result.getDefaultMaxConsumers()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 1034919255..31ab624296 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import java.util.HashMap; +import java.util.Map; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; @@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.version.Version; import org.jboss.logging.Logger; @@ -153,7 +157,15 @@ public class ActiveMQPacketHandler implements ChannelHandler { OperationContext sessionOperationContext = server.newOperationContext(); - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, protocolManager.getPrefixes()); + Map routingTypeMap = protocolManager.getPrefixes(); + + if (connection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + routingTypeMap = new HashMap<>(); + routingTypeMap.put(PacketImpl.OLD_QUEUE_PREFIX, RoutingType.ANYCAST); + routingTypeMap.put(PacketImpl.OLD_TOPIC_PREFIX, RoutingType.MULTICAST); + } + + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 6d4c58836d..ea67c56b34 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1330,6 +1330,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { SimpleString address = removePrefix(message.getAddress()); + // In case the prefix was removed, we also need to update the message + if (address != message.getAddress()) { + message.setAddress(address); + } + if (defaultAddress == null && address != null) { defaultAddress = address; } @@ -1349,12 +1354,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); } - if (address == null) { + if (message.getAddress() == null) { // This could happen with some tests that are ignoring messages throw ActiveMQMessageBundle.BUNDLE.noAddress(); } - if (address.equals(managementAddress)) { + if (message.getAddress().equals(managementAddress)) { // It's a management message handleManagementMessage(tx, message, direct); @@ -1733,7 +1738,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public SimpleString removePrefix(SimpleString address) { - if (prefixEnabled) { + if (prefixEnabled && address != null) { return PrefixUtil.getAddress(address, prefixes); } return address;