ARTEMIS-2139 Fix setJMSReplyTo for 1.x clients with enable1xPrefixes

This commit is contained in:
Francesco Nigro 2018-11-22 16:12:59 +01:00 committed by Justin Bertram
parent 0acd706987
commit 672f536222
11 changed files with 125 additions and 18 deletions

View File

@ -24,10 +24,11 @@ import java.util.Properties;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;
import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jndi.JNDIStorable;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;
/**
* ActiveMQ Artemis implementation of a JMS Destination.
@ -127,6 +128,40 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return destination;
}
public static Destination fromPrefixed1XName(final String addr, final String name) {
ActiveMQDestination destination;
if (addr.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
destination = createQueue(addr);
} else if (addr.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
destination = createTopic(addr);
} else if (addr.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
destination = new ActiveMQTemporaryQueue(addr, null);
} else if (addr.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
destination = new ActiveMQTemporaryTopic(addr, null);
} else {
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}
String unprefixedName = name;
if (name.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TOPIC_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length());
}
destination.setName(unprefixedName);
return destination;
}
public static SimpleString createQueueNameForSubscription(final boolean isDurable,
final String clientID,
final String subscriptionName) {

View File

@ -385,6 +385,16 @@ public class ActiveMQMessage implements javax.jms.Message {
throw new InvalidDestinationException("Foreign destination " + dest);
}
String prefix = prefixOf(dest);
ActiveMQDestination jbd = (ActiveMQDestination) dest;
MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
replyTo = jbd;
}
}
protected static String prefixOf(Destination dest) {
String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
@ -395,12 +405,7 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (dest instanceof ActiveMQTopic) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
ActiveMQDestination jbd = (ActiveMQDestination) dest;
MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
replyTo = jbd;
}
return prefix;
}
protected SimpleString checkPrefix(SimpleString address) {

View File

@ -219,9 +219,9 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQStreamMessage message;
if (enable1xPrefixes) {
message = new ActiveMQStreamMessage(session);
} else {
message = new ActiveMQStreamCompatibleMessage(session);
} else {
message = new ActiveMQStreamMessage(session);
}
return message;
}

View File

@ -33,6 +33,10 @@ public class ActiveMQBytesCompatibleMessage extends ActiveMQBytesMessage {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}
@Override
public Destination getJMSReplyTo() throws JMSException {

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.jms.client.compatible1X;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
@ -69,11 +70,39 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage {
return replyTo;
}
public static Destination findCompatibleReplyTo(ClientMessage message) {
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = setCompatibleReplyTo(dest, message);
}
static Destination setCompatibleReplyTo(Destination dest, ClientMessage message) throws InvalidDestinationException {
if (dest == null) {
MessageUtil.setJMSReplyTo(message, (String) null);
return null;
} else {
if (dest instanceof ActiveMQDestination == false) {
throw new InvalidDestinationException("Foreign destination " + dest);
}
ActiveMQDestination jbd = (ActiveMQDestination) dest;
final String address = jbd.getAddress();
if (hasPrefix1X(address)) {
MessageUtil.setJMSReplyTo(message, jbd.getAddress());
} else {
String prefix = prefixOf(dest);
MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
}
return jbd;
}
}
static Destination findCompatibleReplyTo(ClientMessage message) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
final SimpleString checkedAddress = checkPrefix1X(address);
if (checkedAddress != null) {
return ActiveMQDestination.fromPrefixed1XName(address.toString(), checkedAddress.toString());
}
String name = address.toString();
// swap the old prefixes for the new ones so the proper destination type gets created
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
@ -95,6 +124,22 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage {
return checkPrefix1X(address);
}
private static boolean hasPrefix1X(String address) {
if (address != null) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
return true;
}
}
return false;
}
protected static SimpleString checkPrefix1X(SimpleString address) {
if (address != null) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {

View File

@ -33,6 +33,11 @@ public class ActiveMQMapCompatibleMessage extends ActiveMQMapMessage {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {

View File

@ -34,6 +34,10 @@ public class ActiveMQObjectCompatibleMessage extends ActiveMQObjectMessage {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}
@Override
public Destination getJMSReplyTo() throws JMSException {

View File

@ -33,6 +33,10 @@ public class ActiveMQStreamCompatibleMessage extends ActiveMQStreamMessage {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}
@Override
public Destination getJMSReplyTo() throws JMSException {

View File

@ -27,6 +27,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}
@Override
public Destination getJMSReplyTo() throws JMSException {

View File

@ -76,7 +76,7 @@ void check(List<Message> messages) {
checkMessage(streamMessage);
TextMessage textMessage = iterator.next();
checkMessage(objectMessage);
checkMessage(textMessage);
}

View File

@ -113,8 +113,9 @@ public class ReplyToTest extends ServerBase {
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, SNAPSHOT});
// TODO: It's not currently possible to mix reply to between 1.x and SNAPSHOT. Both sides need to be on the same version!
// combinations.addAll(combinatory(SNAPSHOT, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}));
combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, SNAPSHOT});
return combinations;
}