ARTEMIS-2311 Dealing with Protocol conversions and JMSReplyTo

This commit is contained in:
Clebert Suconic 2019-04-17 16:21:17 -04:00
parent 45121eade2
commit 1e35175a4d
13 changed files with 580 additions and 134 deletions

View File

@ -61,4 +61,27 @@ public class PrefixUtil {
public static SimpleString removeAddress(SimpleString string, SimpleString prefix) {
return string.subSeq(0, prefix.length());
}
public static String removeAddress(String string, String prefix) {
return string.substring(0, prefix.length());
}
public static String removePrefix(String string, String prefix) {
return string.substring(prefix.length());
}
/** This will treat a prefix on the uri-type of queue://, topic://, temporaryTopic://, temporaryQueue.
* This is mostly used on conversions to treat JMSReplyTo or similar usages on core protocol */
public static String getURIPrefix(String address) {
int index = address.toString().indexOf("://");
if (index > 0) {
return address.substring(0, index + 3);
} else {
// SimpleString has a static EMPTY definition, however it's not safe to use it
// since SimpleString is a mutable object, and for that reason I can't leak EMPTY definition.
// We need to create a new one on this case.
return "";
}
}
}

View File

@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;
import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jndi.JNDIStorable;
@ -58,6 +59,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
this.name = name;
}
public static ActiveMQDestination createDestination(RoutingType routingType, SimpleString address) {
if (address == null) {
return null;
} else if (RoutingType.ANYCAST.equals(routingType)) {
return ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(routingType)) {
return ActiveMQDestination.createTopic(address);
} else {
return ActiveMQDestination.fromPrefixedName(address.toString());
}
}
/**
* Static helper method for working with destinations.
*/
@ -88,11 +101,11 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
}
}
public static Destination fromPrefixedName(final String name) {
public static ActiveMQDestination fromPrefixedName(final String name) {
return fromPrefixedName(name, name);
}
public static Destination fromPrefixedName(final String addr, final String name) {
public static ActiveMQDestination fromPrefixedName(final String addr, final String name) {
ActiveMQDestination destination;
if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
@ -111,20 +124,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}
String unprefixedName = name;
if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
}
destination.setName(unprefixedName);
return destination;
}

View File

@ -394,7 +394,7 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
protected static String prefixOf(Destination dest) {
public static String prefixOf(Destination dest) {
String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
@ -423,15 +423,9 @@ public class ActiveMQMessage implements javax.jms.Message {
SimpleString address = message.getAddressSimpleString();
SimpleString changedAddress = checkPrefix(address);
if (address == null) {
dest = null;
} else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createTopic(address);
} else {
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}
RoutingType routingType = message.getRoutingType();
dest = ActiveMQDestination.createDestination(routingType, address);
if (changedAddress != null && dest != null) {
((ActiveMQDestination) dest).setName(changedAddress.toString());

View File

@ -30,10 +30,18 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
@ -45,6 +53,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
import org.jboss.logging.Logger;
/**
* Support class containing constant values and static methods that are used to map to / from
@ -52,6 +61,8 @@ import org.apache.qpid.proton.message.Message;
*/
public final class AMQPMessageSupport {
private static final Logger logger = Logger.getLogger(AMQPMessageSupport.class);
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
// Message Properties used to map AMQP to JMS and back
@ -271,8 +282,23 @@ public final class AMQPMessageSupport {
}
public static String toAddress(Destination destination) {
if (destination instanceof ActiveMQDestination) {
return ((ActiveMQDestination) destination).getAddress();
try {
if (destination instanceof ActiveMQDestination) {
return ((ActiveMQDestination) destination).getAddress();
} else {
if (destination instanceof Queue) {
return ((Queue) destination).getQueueName();
} else if (destination instanceof Topic) {
return ((Topic) destination).getTopicName();
}
}
} catch (JMSException e) {
// ActiveMQDestination (and most JMS implementations I know) will never throw an Exception here
// this is here for compilation support (as JMS declares it), and I don't want to propagate exceptions into
// the converter...
// and for the possibility of who knows in the future!!!
logger.warn(e.getMessage(), e);
}
return null;
}
@ -345,4 +371,39 @@ public final class AMQPMessageSupport {
// ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
}
public static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
return QUEUE_TYPE;
}
public static Destination destination(byte destinationType, String address) {
switch (destinationType) {
case TEMP_QUEUE_TYPE:
return new ActiveMQTemporaryQueue(address, null);
case TEMP_TOPIC_TYPE:
return new ActiveMQTemporaryTopic(address, null);
case TOPIC_TYPE:
return new ActiveMQTopic(address);
case QUEUE_TYPE:
return new ActiveMQQueue(address);
default:
return new ActiveMQQueue(address);
}
}
}

View File

@ -61,8 +61,8 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@ -201,7 +201,7 @@ public class AmqpCoreConverter {
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties);
processProperties(result, properties, annotations);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
@ -220,7 +220,6 @@ public class AmqpCoreConverter {
}
}
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
@ -308,7 +307,7 @@ public class AmqpCoreConverter {
return jms;
}
private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties, MessageAnnotations annotations) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@ -318,13 +317,32 @@ public class AmqpCoreConverter {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
}
if (properties.getTo() != null) {
jms.setJMSDestination(new ServerDestination(properties.getTo()));
byte queueType = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
jms.setJMSDestination(AMQPMessageSupport.destination(queueType, properties.getTo()));
}
if (properties.getSubject() != null) {
jms.setJMSType(properties.getSubject());
}
if (properties.getReplyTo() != null) {
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
byte value = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
switch (value) {
case AMQPMessageSupport.QUEUE_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TEMP_QUEUE_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TOPIC_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TEMP_TOPIC_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
break;
default:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
break;
}
}
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
@ -360,6 +378,18 @@ public class AmqpCoreConverter {
return jms;
}
private static byte parseQueueAnnotation(MessageAnnotations annotations, Symbol symbol) {
Object value = (annotations != null && annotations.getValue() != null ? annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE);
byte queueType;
if (value == null || !(value instanceof Number)) {
queueType = AMQPMessageSupport.QUEUE_TYPE;
} else {
queueType = ((Number)value).byteValue();
}
return queueType;
}
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {

View File

@ -44,11 +44,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress;
import java.nio.charset.StandardCharsets;
@ -63,11 +59,7 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -171,12 +163,12 @@ public class CoreAmqpConverter {
Destination destination = message.getJMSDestination();
if (destination != null) {
properties.setTo(toAddress(destination));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(destination));
}
Destination replyTo = message.getJMSReplyTo();
if (replyTo != null) {
properties.setReplyTo(toAddress(replyTo));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo));
}
Object correlationID = message.getInnerMessage().getCorrelationID();
@ -512,22 +504,5 @@ public class CoreAmqpConverter {
return map;
}
private static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
}
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
/**
* This is just here to avoid all the client checks we need with valid JMS destinations, protocol convertors don't need to
* adhere to the jms. semantics.
*/
public class ServerDestination extends ActiveMQDestination implements Queue {
public ServerDestination(String address) {
super(address, TYPE.DESTINATION, null);
}
public ServerDestination(SimpleString address) {
super(address, TYPE.DESTINATION, null);
}
@Override
public String getQueueName() throws JMSException {
return getName();
}
}

View File

@ -146,7 +146,7 @@ public class ServerJMSMessage implements Message {
public final Destination getJMSReplyTo() throws JMSException {
SimpleString reply = MessageUtil.getJMSReplyTo(message);
if (reply != null) {
return new ServerDestination(reply);
return ActiveMQDestination.fromPrefixedName(reply.toString());
} else {
return null;
}
@ -158,20 +158,14 @@ public class ServerJMSMessage implements Message {
}
@Override
public final Destination getJMSDestination() throws JMSException {
SimpleString sdest = message.getAddressSimpleString();
if (sdest == null) {
return null;
} else {
return new ServerDestination(sdest);
}
public Destination getJMSDestination() throws JMSException {
return ActiveMQDestination.createDestination(message.getRoutingType(), message.getAddressSimpleString());
}
@Override
public final void setJMSDestination(Destination destination) throws JMSException {
if (destination == null) {
message.setAddress((SimpleString)null);
message.setAddress((SimpleString) null);
} else {
message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
}

View File

@ -40,20 +40,25 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQTemporaryTopic;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@ -468,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE);
}
private void doTestConvertMessageWithJMSDestination(ServerDestination jmsDestination, Object expectedAnnotationValue) throws Exception {
private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
ServerJMSTextMessage textMessage = createTextMessage();
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
@ -485,7 +490,7 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsDestination != null) {
assertEquals("Unexpected 'to' address", jmsDestination.getAddress(), amqp.getAddress());
assertEquals("Unexpected 'to' address", AMQPMessageSupport.toAddress(jmsDestination), amqp.getAddress());
}
}
@ -501,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE);
}
private void doTestConvertMessageWithJMSReplyTo(ServerDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
ServerJMSTextMessage textMessage = createTextMessage();
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
@ -518,26 +523,28 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsReplyTo != null) {
assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
assertEquals("Unexpected 'reply-to' address", AMQPMessageSupport.toAddress(jmsReplyTo).toString(), amqp.getReplyTo().toString());
}
}
// ----- Utility Methods used for this Test -------------------------------//
private ServerDestination createDestination(byte destType) {
ServerDestination destination = null;
private Destination createDestination(byte destType) {
Destination destination = null;
String prefix = PrefixUtil.getURIPrefix(TEST_ADDRESS);
String address = PrefixUtil.removePrefix(TEST_ADDRESS, prefix);
switch (destType) {
case QUEUE_TYPE:
destination = new ServerDestination(TEST_ADDRESS);
destination = new ActiveMQQueue(address);
break;
case TOPIC_TYPE:
destination = new ServerDestination(TEST_ADDRESS);
destination = new ActiveMQTopic(address);
break;
case TEMP_QUEUE_TYPE:
destination = new ServerDestination(TEST_ADDRESS);
destination = new ActiveMQTemporaryQueue(address, null);
break;
case TEMP_TOPIC_TYPE:
destination = new ServerDestination(TEST_ADDRESS);
destination = new ActiveMQTemporaryTopic(address, null);
break;
default:
throw new IllegalArgumentException("Invliad Destination Type given/");

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -196,7 +200,18 @@ public final class OpenWireMessageConverter {
final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
putMsgReplyTo(replyTo, marshaller, coreMessage);
if (replyTo instanceof TemporaryQueue) {
MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + (((TemporaryQueue) replyTo).getQueueName()));
} else if (replyTo instanceof TemporaryTopic) {
MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + (((TemporaryTopic) replyTo).getTopicName()));
} else if (replyTo instanceof Queue) {
MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + (((Queue) replyTo).getQueueName()));
} else if (replyTo instanceof Topic) {
MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + (((Topic) replyTo).getTopicName()));
} else {
// it should not happen
MessageUtil.setJMSReplyTo(coreMessage, org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + (((Queue) replyTo).getQueueName()));
}
}
final String userId = messageSend.getUserID();
@ -437,14 +452,6 @@ public final class OpenWireMessageConverter {
}
}
private static void putMsgReplyTo(final ActiveMQDestination replyTo,
final WireFormat marshaller,
final CoreMessage coreMessage) throws IOException {
final ByteSequence replyToBytes = marshaller.marshal(replyTo);
replyToBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
}
private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
final WireFormat marshaller,
final CoreMessage coreMessage) throws IOException {

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
public class RequestReplyMultiProtocolTest extends OpenWireTestBase {
String protocolSender;
String protocolConsumer;
ConnectionFactory senderCF;
ConnectionFactory consumerCF;
private static final SimpleString queueName = SimpleString.toSimpleString("RequestReplyQueueTest");
private static final SimpleString topicName = SimpleString.toSimpleString("RequestReplyTopicTest");
private static final SimpleString replyQueue = SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
public RequestReplyMultiProtocolTest(String protocolSender, String protocolConsumer) {
this.protocolSender = protocolSender;
this.protocolConsumer = protocolConsumer;
}
@Parameterized.Parameters(name = "openWireOnSender={0},openWireOnConsumer={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{"OPENWIRE", "OPENWIRE"},
{"OPENWIRE", "CORE"},
{"OPENWIRE", "AMQP"},
{"CORE", "OPENWIRE"},
{"CORE", "CORE"},
{"CORE", "AMQP"},
{"AMQP", "OPENWIRE"},
{"AMQP", "CORE"},
{"AMQP", "AMQP"},
});
}
@Before
public void setupCF() {
senderCF = createConnectionFactory(protocolSender, urlString);
consumerCF = createConnectionFactory(protocolConsumer, urlString);
}
@Before
public void setupQueue() throws Exception {
Wait.assertTrue(server::isStarted);
Wait.assertTrue(server::isActive);
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true);
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
}
@Test
public void testReplyToUsingQueue() throws Throwable {
testReplyTo(false);
}
@Test
public void testReplyToUsingTopic() throws Throwable {
testReplyTo(true);
}
private void testReplyTo(boolean useTopic) throws Throwable {
Connection senderConn = senderCF.createConnection();
Connection consumerConn = consumerCF.createConnection();
consumerConn.setClientID("consumer");
try {
Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination consumerDestination;
if (useTopic) {
consumerDestination = consumerSess.createTopic(topicName.toString());
} else {
consumerDestination = consumerSess.createQueue(queueName.toString());
}
MessageConsumer consumer;
if (useTopic) {
consumer = consumerSess.createDurableSubscriber((Topic) consumerDestination, "test");
} else {
consumer = consumerSess.createConsumer(consumerDestination);
}
consumerConn.start();
Session senderSess = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
List<Destination> replyToDestinations = new LinkedList<>();
replyToDestinations.add(senderSess.createQueue(replyQueue.toString()));
replyToDestinations.add(senderSess.createTopic(topicName.toString()));
replyToDestinations.add(senderSess.createTemporaryQueue());
replyToDestinations.add(senderSess.createTemporaryTopic());
Destination senderDestination;
if (useTopic) {
senderDestination = senderSess.createTopic(topicName.toString());
} else {
senderDestination = senderSess.createQueue(queueName.toString());
}
MessageProducer sender = senderSess.createProducer(senderDestination);
int i = 0;
for (Destination destination : replyToDestinations) {
TextMessage message = senderSess.createTextMessage("hello " + (i++));
message.setJMSReplyTo(destination);
sender.send(message);
}
i = 0;
for (Destination destination : replyToDestinations) {
TextMessage received = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(received);
System.out.println("Destination::" + received.getJMSDestination());
if (useTopic) {
Assert.assertTrue("JMSDestination type is " + received.getJMSDestination().getClass(), received.getJMSDestination() instanceof Topic);
} else {
Assert.assertTrue("JMSDestination type is " + received.getJMSDestination().getClass(), received.getJMSDestination() instanceof Queue);
}
Assert.assertNotNull(received.getJMSReplyTo());
Assert.assertEquals("hello " + (i++), received.getText());
System.out.println("received " + received.getText() + " and " + received.getJMSReplyTo());
if (destination instanceof Queue) {
Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof Queue);
Assert.assertEquals(((Queue) destination).getQueueName(), ((Queue)received.getJMSReplyTo()).getQueueName());
}
if (destination instanceof Topic) {
Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof Topic);
Assert.assertEquals(((Topic) destination).getTopicName(), ((Topic)received.getJMSReplyTo()).getTopicName());
}
if (destination instanceof TemporaryQueue) {
Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof TemporaryQueue);
Assert.assertEquals(((TemporaryQueue) destination).getQueueName(), ((TemporaryQueue)received.getJMSReplyTo()).getQueueName());
}
if (destination instanceof TemporaryTopic) {
Assert.assertTrue("Type is " + received.getJMSReplyTo().getClass().toString(), received.getJMSReplyTo() instanceof TemporaryTopic);
Assert.assertEquals(((TemporaryTopic) destination).getTopicName(), ((TemporaryTopic)received.getJMSReplyTo()).getTopicName());
}
}
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
senderConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
try {
consumerConn.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.crossprotocol;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import java.net.URI;
import java.util.Arrays;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(Parameterized.class)
public class RequestReplyNonJMSTest extends OpenWireTestBase {
String protocolConsumer;
ConnectionFactory consumerCF;
private static final SimpleString queueName = SimpleString.toSimpleString("RequestReplyQueueTest");
private static final SimpleString topicName = SimpleString.toSimpleString("RequestReplyTopicTest");
private static final SimpleString replyQueue = SimpleString.toSimpleString("ReplyOnRequestReplyQueueTest");
public RequestReplyNonJMSTest(String protocolConsumer) {
this.protocolConsumer = protocolConsumer;
}
@Parameterized.Parameters(name = "openWireOnSender={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{"OPENWIRE"},
{"CORE"},
{"AMQP"}
});
}
@Before
public void setupCF() {
consumerCF = createConnectionFactory(protocolConsumer, urlString);
}
@Before
public void setupQueue() throws Exception {
Wait.assertTrue(server::isStarted);
Wait.assertTrue(server::isActive);
this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
this.server.createQueue(replyQueue, RoutingType.ANYCAST, replyQueue, null, true, false, -1, false, true);
AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
((PostOfficeImpl)this.server.getPostOffice()).getAddressManager().addAddressInfo(info);
}
@Test
public void testReplyToSourceAMQP() throws Throwable {
AmqpClient directClient = new AmqpClient(new URI("tcp://localhost:61616"), null, null);
AmqpConnection connection = null;
AmqpSession session = null;
AmqpSender sender = null;
Connection consumerConn = null;
try {
connection = directClient.connect(true);
session = connection.createSession();
sender = session.createSender(queueName.toString());
AmqpMessage message = new AmqpMessage();
message.setReplyToAddress(replyQueue.toString());
message.setMessageId("msg-1");
message.setText("Test-Message");
sender.send(message);
message = new AmqpMessage();
message.setReplyToAddress(replyQueue.toString());
message.setMessageAnnotation("x-opt-jms-reply-to", new Byte((byte)10)); // that's invalid on the conversion, lets hope it doesn't fail
message.setMessageId("msg-2");
sender.send(message);
consumerConn = consumerCF.createConnection();
Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = consumerSess.createQueue(queueName.toString());
Queue replyQueue = consumerSess.createQueue(RequestReplyNonJMSTest.replyQueue.toString());
MessageConsumer consumer = consumerSess.createConsumer(queue);
consumerConn.start();
javax.jms.Message receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
receivedMessage = consumer.receive(5000);
Assert.assertNotNull(receivedMessage);
Assert.assertEquals(replyQueue, receivedMessage.getJMSReplyTo());
Assert.assertNull(consumer.receiveNoWait());
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
try {
connection.close();
} catch (Throwable e) {
e.printStackTrace();
}
try {
consumerConn.close();
} catch (Throwable dontcare) {
dontcare.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
import javax.jms.ConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
public class CFUtil {
public static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}
}