From 1e35175a4d496486b221bb88e8871cd8d5f94cd4 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 17 Apr 2019 16:21:17 -0400 Subject: [PATCH] ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo --- .../activemq/artemis/utils/PrefixUtil.java | 23 ++ .../jms/client/ActiveMQDestination.java | 31 ++- .../artemis/jms/client/ActiveMQMessage.java | 14 +- .../amqp/converter/AMQPMessageSupport.java | 65 +++++- .../amqp/converter/AmqpCoreConverter.java | 42 +++- .../amqp/converter/CoreAmqpConverter.java | 29 +-- .../amqp/converter/jms/ServerDestination.java | 43 ---- .../amqp/converter/jms/ServerJMSMessage.java | 14 +- .../JMSMappingOutboundTransformerTest.java | 29 ++- .../openwire/OpenWireMessageConverter.java | 25 ++- .../RequestReplyMultiProtocolTest.java | 209 ++++++++++++++++++ .../crossprotocol/RequestReplyNonJMSTest.java | 147 ++++++++++++ .../activemq/artemis/tests/util/CFUtil.java | 43 ++++ 13 files changed, 580 insertions(+), 134 deletions(-) delete mode 100644 artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java index 4066986bbd..90b1211592 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java @@ -61,4 +61,27 @@ public class PrefixUtil { public static SimpleString removeAddress(SimpleString string, SimpleString prefix) { return string.subSeq(0, prefix.length()); } + + public static String removeAddress(String string, String prefix) { + return string.substring(0, prefix.length()); + } + + public static String removePrefix(String string, String prefix) { + return string.substring(prefix.length()); + } + + /** This will treat a prefix on the uri-type of queue://, topic://, temporaryTopic://, temporaryQueue. + * This is mostly used on conversions to treat JMSReplyTo or similar usages on core protocol */ + public static String getURIPrefix(String address) { + int index = address.toString().indexOf("://"); + if (index > 0) { + return address.substring(0, index + 3); + } else { + // SimpleString has a static EMPTY definition, however it's not safe to use it + // since SimpleString is a mutable object, and for that reason I can't leak EMPTY definition. + // We need to create a new one on this case. + return ""; + } + } + } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java index c0f3cdcacc..0a1b721361 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.ParameterisedAddress; import org.apache.activemq.artemis.api.core.QueueAttributes; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.jndi.JNDIStorable; @@ -58,6 +59,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se this.name = name; } + public static ActiveMQDestination createDestination(RoutingType routingType, SimpleString address) { + if (address == null) { + return null; + } else if (RoutingType.ANYCAST.equals(routingType)) { + return ActiveMQDestination.createQueue(address); + } else if (RoutingType.MULTICAST.equals(routingType)) { + return ActiveMQDestination.createTopic(address); + } else { + return ActiveMQDestination.fromPrefixedName(address.toString()); + } + } + /** * Static helper method for working with destinations. */ @@ -88,11 +101,11 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se } } - public static Destination fromPrefixedName(final String name) { + public static ActiveMQDestination fromPrefixedName(final String name) { return fromPrefixedName(name, name); } - public static Destination fromPrefixedName(final String addr, final String name) { + public static ActiveMQDestination fromPrefixedName(final String addr, final String name) { ActiveMQDestination destination; if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) { @@ -111,20 +124,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null); } - String unprefixedName = name; - - if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) { - unprefixedName = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length()); - } else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) { - unprefixedName = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length()); - } else if (name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) { - unprefixedName = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length()); - } else if (name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) { - unprefixedName = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length()); - } - - destination.setName(unprefixedName); - return destination; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index d332e6ccf0..6a6292b613 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -394,7 +394,7 @@ public class ActiveMQMessage implements javax.jms.Message { } } - protected static String prefixOf(Destination dest) { + public static String prefixOf(Destination dest) { String prefix = ""; if (dest instanceof ActiveMQTemporaryQueue) { prefix = TEMP_QUEUE_QUALIFED_PREFIX; @@ -423,15 +423,9 @@ public class ActiveMQMessage implements javax.jms.Message { SimpleString address = message.getAddressSimpleString(); SimpleString changedAddress = checkPrefix(address); - if (address == null) { - dest = null; - } else if (RoutingType.ANYCAST.equals(message.getRoutingType())) { - dest = ActiveMQDestination.createQueue(address); - } else if (RoutingType.MULTICAST.equals(message.getRoutingType())) { - dest = ActiveMQDestination.createTopic(address); - } else { - dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString()); - } + RoutingType routingType = message.getRoutingType(); + + dest = ActiveMQDestination.createDestination(routingType, address); if (changedAddress != null && dest != null) { ((ActiveMQDestination) dest).setName(changedAddress.toString()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index fc31fc21e8..5f739504e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -30,10 +30,18 @@ import java.util.Set; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue; +import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic; +import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; @@ -45,6 +53,7 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; +import org.jboss.logging.Logger; /** * Support class containing constant values and static methods that are used to map to / from @@ -52,6 +61,8 @@ import org.apache.qpid.proton.message.Message; */ public final class AMQPMessageSupport { + private static final Logger logger = Logger.getLogger(AMQPMessageSupport.class); + public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to"; // Message Properties used to map AMQP to JMS and back @@ -271,8 +282,23 @@ public final class AMQPMessageSupport { } public static String toAddress(Destination destination) { - if (destination instanceof ActiveMQDestination) { - return ((ActiveMQDestination) destination).getAddress(); + try { + if (destination instanceof ActiveMQDestination) { + return ((ActiveMQDestination) destination).getAddress(); + } else { + if (destination instanceof Queue) { + return ((Queue) destination).getQueueName(); + } else if (destination instanceof Topic) { + + return ((Topic) destination).getTopicName(); + } + } + } catch (JMSException e) { + // ActiveMQDestination (and most JMS implementations I know) will never throw an Exception here + // this is here for compilation support (as JMS declares it), and I don't want to propagate exceptions into + // the converter... + // and for the possibility of who knows in the future!!! + logger.warn(e.getMessage(), e); } return null; } @@ -345,4 +371,39 @@ public final class AMQPMessageSupport { // ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message; } + + + public static byte destinationType(Destination destination) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return TEMP_QUEUE_TYPE; + } else { + return QUEUE_TYPE; + } + } else if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return TEMP_TOPIC_TYPE; + } else { + return TOPIC_TYPE; + } + } + + return QUEUE_TYPE; + } + + public static Destination destination(byte destinationType, String address) { + switch (destinationType) { + case TEMP_QUEUE_TYPE: + return new ActiveMQTemporaryQueue(address, null); + case TEMP_TOPIC_TYPE: + return new ActiveMQTemporaryTopic(address, null); + case TOPIC_TYPE: + return new ActiveMQTopic(address); + case QUEUE_TYPE: + return new ActiveMQQueue(address); + default: + return new ActiveMQQueue(address); + } + } + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 8e854abc08..32d35966ae 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -61,8 +61,8 @@ import javax.jms.JMSException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -201,7 +201,7 @@ public class AmqpCoreConverter { processHeader(result, header); processMessageAnnotations(result, annotations); processApplicationProperties(result, applicationProperties); - processProperties(result, properties); + processProperties(result, properties, annotations); processFooter(result, footer); processExtraProperties(result, message.getExtraProperties()); @@ -220,7 +220,6 @@ public class AmqpCoreConverter { } } - result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); result.getInnerMessage().setAddress(message.getAddressSimpleString()); @@ -308,7 +307,7 @@ public class AmqpCoreConverter { return jms; } - private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception { + private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties, MessageAnnotations annotations) throws Exception { if (properties != null) { if (properties.getMessageId() != null) { jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId())); @@ -318,13 +317,32 @@ public class AmqpCoreConverter { jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); } if (properties.getTo() != null) { - jms.setJMSDestination(new ServerDestination(properties.getTo())); + byte queueType = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); + jms.setJMSDestination(AMQPMessageSupport.destination(queueType, properties.getTo())); } if (properties.getSubject() != null) { jms.setJMSType(properties.getSubject()); } if (properties.getReplyTo() != null) { - jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo())); + byte value = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION); + + switch (value) { + case AMQPMessageSupport.QUEUE_TYPE: + org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo()); + break; + case AMQPMessageSupport.TEMP_QUEUE_TYPE: + org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo()); + break; + case AMQPMessageSupport.TOPIC_TYPE: + org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + properties.getReplyTo()); + break; + case AMQPMessageSupport.TEMP_TOPIC_TYPE: + org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo()); + break; + default: + org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo()); + break; + } } Object correlationID = properties.getCorrelationId(); if (correlationID != null) { @@ -360,6 +378,18 @@ public class AmqpCoreConverter { return jms; } + private static byte parseQueueAnnotation(MessageAnnotations annotations, Symbol symbol) { + Object value = (annotations != null && annotations.getValue() != null ? annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE); + + byte queueType; + if (value == null || !(value instanceof Number)) { + queueType = AMQPMessageSupport.QUEUE_TYPE; + } else { + queueType = ((Number)value).byteValue(); + } + return queueType; + } + @SuppressWarnings("unchecked") private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception { if (footer != null && footer.getValue() != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index af85c065bb..1099d51c4c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -44,11 +44,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE; -import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress; import java.nio.charset.StandardCharsets; @@ -63,11 +59,7 @@ import java.util.Set; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageEOFException; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; import javax.jms.TextMessage; -import javax.jms.Topic; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -171,12 +163,12 @@ public class CoreAmqpConverter { Destination destination = message.getJMSDestination(); if (destination != null) { properties.setTo(toAddress(destination)); - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination)); } Destination replyTo = message.getJMSReplyTo(); if (replyTo != null) { properties.setReplyTo(toAddress(replyTo)); - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo)); } Object correlationID = message.getInnerMessage().getCorrelationID(); @@ -512,22 +504,5 @@ public class CoreAmqpConverter { return map; } - private static byte destinationType(Destination destination) { - if (destination instanceof Queue) { - if (destination instanceof TemporaryQueue) { - return TEMP_QUEUE_TYPE; - } else { - return QUEUE_TYPE; - } - } else if (destination instanceof Topic) { - if (destination instanceof TemporaryTopic) { - return TEMP_TOPIC_TYPE; - } else { - return TOPIC_TYPE; - } - } - - throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); - } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java deleted file mode 100644 index 5a2f55bb67..0000000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.converter.jms; - -import javax.jms.JMSException; -import javax.jms.Queue; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; - -/** - * This is just here to avoid all the client checks we need with valid JMS destinations, protocol convertors don't need to - * adhere to the jms. semantics. - */ -public class ServerDestination extends ActiveMQDestination implements Queue { - - public ServerDestination(String address) { - super(address, TYPE.DESTINATION, null); - } - - public ServerDestination(SimpleString address) { - super(address, TYPE.DESTINATION, null); - } - - @Override - public String getQueueName() throws JMSException { - return getName(); - } -} diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index 2ca589aa4f..ea719f4622 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -146,7 +146,7 @@ public class ServerJMSMessage implements Message { public final Destination getJMSReplyTo() throws JMSException { SimpleString reply = MessageUtil.getJMSReplyTo(message); if (reply != null) { - return new ServerDestination(reply); + return ActiveMQDestination.fromPrefixedName(reply.toString()); } else { return null; } @@ -158,20 +158,14 @@ public class ServerJMSMessage implements Message { } @Override - public final Destination getJMSDestination() throws JMSException { - SimpleString sdest = message.getAddressSimpleString(); - - if (sdest == null) { - return null; - } else { - return new ServerDestination(sdest); - } + public Destination getJMSDestination() throws JMSException { + return ActiveMQDestination.createDestination(message.getRoutingType(), message.getAddressSimpleString()); } @Override public final void setJMSDestination(Destination destination) throws JMSException { if (destination == null) { - message.setAddress((SimpleString)null); + message.setAddress((SimpleString) null); } else { message.setAddress(((ActiveMQDestination) destination).getSimpleAddress()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index 7d573eddf3..062e0ddab1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -40,20 +40,25 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import javax.jms.Destination; import javax.jms.JMSException; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue; +import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic; +import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; @@ -468,7 +473,7 @@ public class JMSMappingOutboundTransformerTest { doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE); } - private void doTestConvertMessageWithJMSDestination(ServerDestination jmsDestination, Object expectedAnnotationValue) throws Exception { + private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception { ServerJMSTextMessage textMessage = createTextMessage(); textMessage.setText("myTextMessageContent"); textMessage.setJMSDestination(jmsDestination); @@ -485,7 +490,7 @@ public class JMSMappingOutboundTransformerTest { } if (jmsDestination != null) { - assertEquals("Unexpected 'to' address", jmsDestination.getAddress(), amqp.getAddress()); + assertEquals("Unexpected 'to' address", AMQPMessageSupport.toAddress(jmsDestination), amqp.getAddress()); } } @@ -501,7 +506,7 @@ public class JMSMappingOutboundTransformerTest { doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE); } - private void doTestConvertMessageWithJMSReplyTo(ServerDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception { + private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception { ServerJMSTextMessage textMessage = createTextMessage(); textMessage.setText("myTextMessageContent"); textMessage.setJMSReplyTo(jmsReplyTo); @@ -518,26 +523,28 @@ public class JMSMappingOutboundTransformerTest { } if (jmsReplyTo != null) { - assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getSimpleAddress(), amqp.getReplyTo()); + assertEquals("Unexpected 'reply-to' address", AMQPMessageSupport.toAddress(jmsReplyTo).toString(), amqp.getReplyTo().toString()); } } // ----- Utility Methods used for this Test -------------------------------// - private ServerDestination createDestination(byte destType) { - ServerDestination destination = null; + private Destination createDestination(byte destType) { + Destination destination = null; + String prefix = PrefixUtil.getURIPrefix(TEST_ADDRESS); + String address = PrefixUtil.removePrefix(TEST_ADDRESS, prefix); switch (destType) { case QUEUE_TYPE: - destination = new ServerDestination(TEST_ADDRESS); + destination = new ActiveMQQueue(address); break; case TOPIC_TYPE: - destination = new ServerDestination(TEST_ADDRESS); + destination = new ActiveMQTopic(address); break; case TEMP_QUEUE_TYPE: - destination = new ServerDestination(TEST_ADDRESS); + destination = new ActiveMQTemporaryQueue(address, null); break; case TEMP_TOPIC_TYPE: - destination = new ServerDestination(TEST_ADDRESS); + destination = new ActiveMQTemporaryTopic(address, null); break; default: throw new IllegalArgumentException("Invliad Destination Type given/"); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index db74696388..c6c91f303c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -196,7 +200,18 @@ public final class OpenWireMessageConverter { final ActiveMQDestination replyTo = messageSend.getReplyTo(); if (replyTo != null) { - putMsgReplyTo(replyTo, marshaller, coreMessage); + if (replyTo instanceof TemporaryQueue) { + MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + (((TemporaryQueue) replyTo).getQueueName())); + } else if (replyTo instanceof TemporaryTopic) { + MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + (((TemporaryTopic) replyTo).getTopicName())); + } else if (replyTo instanceof Queue) { + MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + (((Queue) replyTo).getQueueName())); + } else if (replyTo instanceof Topic) { + MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + (((Topic) replyTo).getTopicName())); + } else { + // it should not happen + MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + (((Queue) replyTo).getQueueName())); + } } final String userId = messageSend.getUserID(); @@ -437,14 +452,6 @@ public final class OpenWireMessageConverter { } } - private static void putMsgReplyTo(final ActiveMQDestination replyTo, - final WireFormat marshaller, - final CoreMessage coreMessage) throws IOException { - final ByteSequence replyToBytes = marshaller.marshal(replyTo); - replyToBytes.compact(); - coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data); - } - private static void putMsgOriginalDestination(final ActiveMQDestination origDest, final WireFormat marshaller, final CoreMessage coreMessage) throws IOException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java new file mode 100644 index 0000000000..b842942ea3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyMultiProtocolTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory; + +@RunWith(Parameterized.class) +public class RequestReplyMultiProtocolTest extends OpenWireTestBase { + + String protocolSender; + String protocolConsumer; + ConnectionFactory senderCF; + ConnectionFactory consumerCF; + private static final SimpleString queueName = SimpleString.toSimpleString("RequestReplyQueueTest"); + private static final SimpleString topicName = SimpleString.toSimpleString("RequestReplyTopicTest"); + private static final SimpleString replyQueue = SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest"); + + public RequestReplyMultiProtocolTest(String protocolSender, String protocolConsumer) { + this.protocolSender = protocolSender; + this.protocolConsumer = protocolConsumer; + } + + @Parameterized.Parameters(name = "openWireOnSender={0},openWireOnConsumer={1}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {"OPENWIRE", "OPENWIRE"}, + {"OPENWIRE", "CORE"}, + {"OPENWIRE", "AMQP"}, + {"CORE", "OPENWIRE"}, + {"CORE", "CORE"}, + {"CORE", "AMQP"}, + {"AMQP", "OPENWIRE"}, + {"AMQP", "CORE"}, + {"AMQP", "AMQP"}, + }); + } + + + + @Before + public void setupCF() { + senderCF = createConnectionFactory(protocolSender, urlString); + consumerCF = createConnectionFactory(protocolConsumer, urlString); + } + + @Before + public void setupQueue() throws Exception { + Wait.assertTrue(server::isStarted); + Wait.assertTrue(server::isActive); + this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true); + this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true); + AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST); + ((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info); + } + + + @Test + public void testReplyToUsingQueue() throws Throwable { + testReplyTo(false); + } + + @Test + public void testReplyToUsingTopic() throws Throwable { + testReplyTo(true); + } + + private void testReplyTo(boolean useTopic) throws Throwable { + Connection senderConn = senderCF.createConnection(); + Connection consumerConn = consumerCF.createConnection(); + consumerConn.setClientID("consumer"); + try { + Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination consumerDestination; + if (useTopic) { + consumerDestination = consumerSess.createTopic(topicName.toString()); + } else { + consumerDestination = consumerSess.createQueue(queueName.toString()); + } + MessageConsumer consumer; + + if (useTopic) { + consumer = consumerSess.createDurableSubscriber((Topic) consumerDestination, "test"); + } else { + consumer = consumerSess.createConsumer(consumerDestination); + } + consumerConn.start(); + + + Session senderSess = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + List replyToDestinations = new LinkedList<>(); + replyToDestinations.add(senderSess.createQueue(replyQueue.toString())); + replyToDestinations.add(senderSess.createTopic(topicName.toString())); + replyToDestinations.add(senderSess.createTemporaryQueue()); + replyToDestinations.add(senderSess.createTemporaryTopic()); + Destination senderDestination; + + if (useTopic) { + senderDestination = senderSess.createTopic(topicName.toString()); + } else { + senderDestination = senderSess.createQueue(queueName.toString()); + } + MessageProducer sender = senderSess.createProducer(senderDestination); + + int i = 0; + for (Destination destination : replyToDestinations) { + TextMessage message = senderSess.createTextMessage("hello " + (i++)); + message.setJMSReplyTo(destination); + sender.send(message); + } + + + i = 0; + for (Destination destination : replyToDestinations) { + TextMessage received = (TextMessage)consumer.receive(5000); + + Assert.assertNotNull(received); + System.out.println("Destination::" + received.getJMSDestination()); + + if (useTopic) { + Assert.assertTrue("JMSDestination type is " + received.getJMSDestination().getClass(), received.getJMSDestination() instanceof Topic); + } else { + Assert.assertTrue("JMSDestination type is " + received.getJMSDestination().getClass(), received.getJMSDestination() instanceof Queue); + } + + Assert.assertNotNull(received.getJMSReplyTo()); + Assert.assertEquals("hello " + (i++), received.getText()); + + System.out.println("received " + received.getText() + " and " + received.getJMSReplyTo()); + + if (destination instanceof Queue) { + Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof Queue); + Assert.assertEquals(((Queue) destination).getQueueName(), ((Queue)received.getJMSReplyTo()).getQueueName()); + } + if (destination instanceof Topic) { + Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof Topic); + Assert.assertEquals(((Topic) destination).getTopicName(), ((Topic)received.getJMSReplyTo()).getTopicName()); + } + if (destination instanceof TemporaryQueue) { + Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof TemporaryQueue); + Assert.assertEquals(((TemporaryQueue) destination).getQueueName(), ((TemporaryQueue)received.getJMSReplyTo()).getQueueName()); + } + if (destination instanceof TemporaryTopic) { + Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof TemporaryTopic); + Assert.assertEquals(((TemporaryTopic) destination).getTopicName(), ((TemporaryTopic)received.getJMSReplyTo()).getTopicName()); + } + } + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + try { + senderConn.close(); + } catch (Throwable e) { + e.printStackTrace(); + } + try { + consumerConn.close(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + } + +} + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java new file mode 100644 index 0000000000..7c7852c164 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/RequestReplyNonJMSTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import java.net.URI; +import java.util.Arrays; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory; + +@RunWith(Parameterized.class) +public class RequestReplyNonJMSTest extends OpenWireTestBase { + + String protocolConsumer; + ConnectionFactory consumerCF; + private static final SimpleString queueName = SimpleString.toSimpleString("RequestReplyQueueTest"); + private static final SimpleString topicName = SimpleString.toSimpleString("RequestReplyTopicTest"); + private static final SimpleString replyQueue = SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest"); + + public RequestReplyNonJMSTest(String protocolConsumer) { + this.protocolConsumer = protocolConsumer; + } + + @Parameterized.Parameters(name = "openWireOnSender={0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {"OPENWIRE"}, + {"CORE"}, + {"AMQP"} + }); + } + + + + @Before + public void setupCF() { + consumerCF = createConnectionFactory(protocolConsumer, urlString); + } + + @Before + public void setupQueue() throws Exception { + Wait.assertTrue(server::isStarted); + Wait.assertTrue(server::isActive); + this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true); + this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true); + AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST); + ((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info); + } + + + @Test + public void testReplyToSourceAMQP() throws Throwable { + + AmqpClient directClient = new AmqpClient(new URI("tcp://localhost:61616"), null, null); + AmqpConnection connection = null; + AmqpSession session = null; + AmqpSender sender = null; + Connection consumerConn = null; + try { + connection = directClient.connect(true); + session = connection.createSession(); + sender = session.createSender(queueName.toString()); + + AmqpMessage message = new AmqpMessage(); + message.setReplyToAddress(replyQueue.toString()); + message.setMessageId("msg-1"); + message.setText("Test-Message"); + sender.send(message); + + message = new AmqpMessage(); + message.setReplyToAddress(replyQueue.toString()); + message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail + message.setMessageId("msg-2"); + sender.send(message); + + consumerConn = consumerCF.createConnection(); + Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = consumerSess.createQueue(queueName.toString()); + Queue replyQueue = consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString()); + + MessageConsumer consumer = consumerSess.createConsumer(queue); + consumerConn.start(); + javax.jms.Message receivedMessage = consumer.receive(5000); + Assert.assertNotNull(receivedMessage); + Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo()); + + receivedMessage = consumer.receive(5000); + Assert.assertNotNull(receivedMessage); + Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo()); + + Assert.assertNull(consumer.receiveNoWait()); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + try { + connection.close(); + } catch (Throwable e) { + e.printStackTrace(); + } + try { + consumerConn.close(); + } catch (Throwable dontcare) { + dontcare.printStackTrace(); + } + } + } + +} + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java new file mode 100644 index 0000000000..923ddb7a11 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/CFUtil.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.util; + +import javax.jms.ConnectionFactory; + +import org.apache.qpid.jms.JmsConnectionFactory; + +public class CFUtil { + + public static ConnectionFactory createConnectionFactory(String protocol, String uri) { + if (protocol.toUpperCase().equals("OPENWIRE")) { + return new org.apache.activemq.ActiveMQConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("AMQP")) { + + if (uri.startsWith("tcp://")) { + // replacing tcp:// by amqp:// + uri = "amqp" + uri.substring(3); + } + return new JmsConnectionFactory(uri); + } else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) { + return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri); + } else { + throw new IllegalStateException("Unkown:" + protocol); + } + } + +}