diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index 54178c014d..9bdc486428 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -24,10 +24,11 @@ import java.util.Properties; import java.util.UUID; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.ParameterisedAddress; import org.apache.activemq.artemis.api.core.QueueAttributes; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.jndi.JNDIStorable; -import org.apache.activemq.artemis.api.core.ParameterisedAddress; /** * ActiveMQ Artemis implementation of a JMS Destination. @@ -127,6 +128,40 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se return destination; } + public static Destination fromPrefixed1XName(final String addr, final String name) { + ActiveMQDestination destination; + if (addr.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + destination = createQueue(addr); + } else if (addr.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + destination = createTopic(addr); + } else if (addr.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) { + destination = new ActiveMQTemporaryQueue(addr, null); + } else if (addr.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) { + destination = new ActiveMQTemporaryTopic(addr, null); + } else { + destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null); + } + + String unprefixedName = name; + + if (name.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + unprefixedName = name.substring(PacketImpl.OLD_QUEUE_PREFIX.length()); + } else if (name.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + unprefixedName = name.substring(PacketImpl.OLD_TOPIC_PREFIX.length()); + } else if (name.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) { + unprefixedName = name.substring(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length()); + } else if (name.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) { + unprefixedName = name.substring(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length()); + } + + destination.setName(unprefixedName); + + return destination; + } + + + + public static SimpleString createQueueNameForSubscription(final boolean isDurable, final String clientID, final String subscriptionName) { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index af74a3e94f..bb210dee46 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -385,16 +385,7 @@ public class ActiveMQMessage implements javax.jms.Message { throw new InvalidDestinationException("Foreign destination " + dest); } - String prefix = ""; - if (dest instanceof ActiveMQTemporaryQueue) { - prefix = TEMP_QUEUE_QUALIFED_PREFIX; - } else if (dest instanceof ActiveMQQueue) { - prefix = QUEUE_QUALIFIED_PREFIX; - } else if (dest instanceof ActiveMQTemporaryTopic) { - prefix = TEMP_TOPIC_QUALIFED_PREFIX; - } else if (dest instanceof ActiveMQTopic) { - prefix = TOPIC_QUALIFIED_PREFIX; - } + String prefix = prefixOf(dest); ActiveMQDestination jbd = (ActiveMQDestination) dest; MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress()); @@ -403,6 +394,20 @@ public class ActiveMQMessage implements javax.jms.Message { } } + protected static String prefixOf(Destination dest) { + String prefix = ""; + if (dest instanceof ActiveMQTemporaryQueue) { + prefix = TEMP_QUEUE_QUALIFED_PREFIX; + } else if (dest instanceof ActiveMQQueue) { + prefix = QUEUE_QUALIFIED_PREFIX; + } else if (dest instanceof ActiveMQTemporaryTopic) { + prefix = TEMP_TOPIC_QUALIFED_PREFIX; + } else if (dest instanceof ActiveMQTopic) { + prefix = TOPIC_QUALIFIED_PREFIX; + } + return prefix; + } + protected SimpleString checkPrefix(SimpleString address) { return address; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index ae6828365e..693ab84ead 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -219,9 +219,9 @@ public class ActiveMQSession implements QueueSession, TopicSession { ActiveMQStreamMessage message; if (enable1xPrefixes) { - message = new ActiveMQStreamMessage(session); - } else { message = new ActiveMQStreamCompatibleMessage(session); + } else { + message = new ActiveMQStreamMessage(session); } return message; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQBytesCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQBytesCompatibleMessage.java index 626b5a5357..a52d0d8559 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQBytesCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQBytesCompatibleMessage.java @@ -33,6 +33,10 @@ public class ActiveMQBytesCompatibleMessage extends ActiveMQBytesMessage { return ActiveMQCompatibleMessage.checkPrefix1X(address); } + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message); + } @Override public Destination getJMSReplyTo() throws JMSException { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java index 1b21cbfa22..ec4e720d07 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.client.compatible1X; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.jms.Message; @@ -69,11 +70,39 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage { return replyTo; } - public static Destination findCompatibleReplyTo(ClientMessage message) { + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = setCompatibleReplyTo(dest, message); + } + + static Destination setCompatibleReplyTo(Destination dest, ClientMessage message) throws InvalidDestinationException { + if (dest == null) { + MessageUtil.setJMSReplyTo(message, (String) null); + return null; + } else { + if (dest instanceof ActiveMQDestination == false) { + throw new InvalidDestinationException("Foreign destination " + dest); + } + ActiveMQDestination jbd = (ActiveMQDestination) dest; + final String address = jbd.getAddress(); + if (hasPrefix1X(address)) { + MessageUtil.setJMSReplyTo(message, jbd.getAddress()); + } else { + String prefix = prefixOf(dest); + MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress()); + } + return jbd; + } + } + + static Destination findCompatibleReplyTo(ClientMessage message) { SimpleString address = MessageUtil.getJMSReplyTo(message); if (address != null) { + final SimpleString checkedAddress = checkPrefix1X(address); + if (checkedAddress != null) { + return ActiveMQDestination.fromPrefixed1XName(address.toString(), checkedAddress.toString()); + } String name = address.toString(); - // swap the old prefixes for the new ones so the proper destination type gets created if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) { name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString(); @@ -95,6 +124,22 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage { return checkPrefix1X(address); } + private static boolean hasPrefix1X(String address) { + if (address != null) { + if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + return true; + } else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) { + return true; + } else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + return true; + } else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) { + return true; + } + } + + return false; + } + protected static SimpleString checkPrefix1X(SimpleString address) { if (address != null) { if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQMapCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQMapCompatibleMessage.java index 2d6e576332..1fd2caed09 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQMapCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQMapCompatibleMessage.java @@ -33,6 +33,11 @@ public class ActiveMQMapCompatibleMessage extends ActiveMQMapMessage { return ActiveMQCompatibleMessage.checkPrefix1X(address); } + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message); + } + @Override public Destination getJMSReplyTo() throws JMSException { if (replyTo == null) { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQObjectCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQObjectCompatibleMessage.java index 13a9d7dc7b..0a1a802554 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQObjectCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQObjectCompatibleMessage.java @@ -34,6 +34,10 @@ public class ActiveMQObjectCompatibleMessage extends ActiveMQObjectMessage { return ActiveMQCompatibleMessage.checkPrefix1X(address); } + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message); + } @Override public Destination getJMSReplyTo() throws JMSException { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQStreamCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQStreamCompatibleMessage.java index bb2fda6d2b..3b176cf7c6 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQStreamCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQStreamCompatibleMessage.java @@ -33,6 +33,10 @@ public class ActiveMQStreamCompatibleMessage extends ActiveMQStreamMessage { return ActiveMQCompatibleMessage.checkPrefix1X(address); } + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message); + } @Override public Destination getJMSReplyTo() throws JMSException { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java index 451c582806..ae8aa52213 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java @@ -27,6 +27,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage { + @Override + public void setJMSReplyTo(Destination dest) throws JMSException { + replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message); + } @Override public Destination getJMSReplyTo() throws JMSException { diff --git a/tests/compatibility-tests/src/main/resources/ReplyToTest/replyToReceive.groovy b/tests/compatibility-tests/src/main/resources/ReplyToTest/replyToReceive.groovy index 156cbdb068..7cfa673abc 100644 --- a/tests/compatibility-tests/src/main/resources/ReplyToTest/replyToReceive.groovy +++ b/tests/compatibility-tests/src/main/resources/ReplyToTest/replyToReceive.groovy @@ -76,7 +76,7 @@ void check(List messages) { checkMessage(streamMessage); TextMessage textMessage = iterator.next(); - checkMessage(objectMessage); + checkMessage(textMessage); } diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ReplyToTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ReplyToTest.java index fb7846cb19..1f1699f57e 100644 --- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ReplyToTest.java +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/ReplyToTest.java @@ -113,8 +113,9 @@ public class ReplyToTest extends ServerBase { combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE}); combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, SNAPSHOT}); - // TODO: It's not currently possible to mix reply to between 1.x and SNAPSHOT. Both sides need to be on the same version! - // combinations.addAll(combinatory(SNAPSHOT, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE})); + combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, ONE_FIVE}); + combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, SNAPSHOT}); + return combinations; }