ARTEMIS-2023 extend 1x naming to other ops

This commit is contained in:
Justin Bertram 2018-08-13 21:40:52 -05:00 committed by Martyn Taylor
parent dd15aa87da
commit a0b4c4dd19
10 changed files with 123 additions and 27 deletions

View File

@ -53,6 +53,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return input.replace("\\", "\\\\").replace(".", "\\.");
}
protected void setName(String name) {
this.name = name;
}
/**
* Static helper method for working with destinations.
*/
@ -84,21 +88,43 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
}
public static Destination fromPrefixedName(final String name) {
if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
String address = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
return createQueue(address);
} else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
String address = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
return createTopic(address);
} else if (name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
String address = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryQueue(address, null);
} else if (name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
String address = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
return new ActiveMQTemporaryTopic(address, null);
return fromPrefixedName(name, name);
}
public static Destination fromPrefixedName(final String addr, final String name) {
ActiveMQDestination destination;
if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
destination = createQueue(address);
} else if (addr.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
destination = createTopic(address);
} else if (addr.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
destination = new ActiveMQTemporaryQueue(address, null);
} else if (addr.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
destination = new ActiveMQTemporaryTopic(address, null);
} else {
return new ActiveMQDestination(name, TYPE.DESTINATION, null);
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}
String unprefixedName = name;
if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
}
destination.setName(unprefixedName);
return destination;
}
public static SimpleString createQueueNameForSubscription(final boolean isDurable,
@ -274,10 +300,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
@Deprecated
private String address;
/**
* The "JMS" name of the destination. Needed for serialization backwards compatibility.
*/
@Deprecated
private String name;
private final boolean temporary;
@ -313,7 +335,6 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
this.queue = TYPE.isQueue(type);
}
@Deprecated
protected ActiveMQDestination(final String address,
final String name,
final TYPE type,
@ -337,6 +358,16 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
setSimpleAddress(SimpleString.toSimpleString(address));
}
@Override
public String toString() {
return "ActiveMQDestination [address=" + simpleAddress.toString() +
", name=" +
name +
", type =" +
thetype +
"]";
}
public void setSimpleAddress(SimpleString address) {
if (address == null) {
throw new IllegalArgumentException("address cannot be null");

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
@ -65,6 +66,11 @@ public class ActiveMQMessage implements javax.jms.Message {
// Constants -----------------------------------------------------
public static final byte TYPE = org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
public static final SimpleString OLD_QUEUE_QUALIFIED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + PacketImpl.OLD_QUEUE_PREFIX);
public static final SimpleString OLD_TEMP_QUEUE_QUALIFED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_QUEUE_PREFIX);
public static final SimpleString OLD_TOPIC_QUALIFIED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + PacketImpl.OLD_TOPIC_PREFIX);
public static final SimpleString OLD_TEMP_TOPIC_QUALIFED_PREFIX = SimpleString.toSimpleString(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_TOPIC_PREFIX);
public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage) {
Map<String, Object> jmsMessage = new HashMap<>();
@ -203,6 +209,8 @@ public class ActiveMQMessage implements javax.jms.Message {
private boolean clientAck;
private boolean enable1xPrefixes;
private long jmsDeliveryTime;
// Constructors --------------------------------------------------
@ -358,11 +366,23 @@ public class ActiveMQMessage implements javax.jms.Message {
@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
String name = address.toString();
SimpleString repl = MessageUtil.getJMSReplyTo(message);
if (repl != null) {
replyTo = ActiveMQDestination.fromPrefixedName(repl.toString());
// swap the old prefixes for the new ones so the proper destination type gets created
if (enable1xPrefixes) {
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_QUEUE_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_QUEUE_QUALIFED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TOPIC_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_TOPIC_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_TOPIC_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_TOPIC_QUALIFED_PREFIX.length(), address.length()).toString();
}
}
replyTo = ActiveMQDestination.fromPrefixedName(address.toString(), name);
}
}
return replyTo;
@ -401,6 +421,20 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddressSimpleString();
SimpleString name = address;
if (address != null & enable1xPrefixes) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TOPIC_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length(), address.length());
}
}
if (address == null) {
dest = null;
} else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
@ -408,7 +442,11 @@ public class ActiveMQMessage implements javax.jms.Message {
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createTopic(address);
} else {
dest = ActiveMQDestination.fromPrefixedName(address.toString());
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}
if (name != null) {
((ActiveMQDestination) dest).setName(name.toString());
}
}
@ -865,6 +903,10 @@ public class ActiveMQMessage implements javax.jms.Message {
}
}
public void setEnable1xPrefixes(boolean enable1xPrefixes) {
this.enable1xPrefixes = enable1xPrefixes;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("ActiveMQMessage[");

View File

@ -220,6 +220,10 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
coreMessage.getType() == ActiveMQObjectMessage.TYPE;
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);
if (session.isEnable1xPrefixes()) {
jmsMsg.setEnable1xPrefixes(true);
}
try {
jmsMsg.doBeforeReceive();
} catch (IndexOutOfBoundsException ioob) {

View File

@ -72,7 +72,7 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
@Override
public String getQueueName() {
return getAddress();
return getName();
}
@Override

View File

@ -49,18 +49,22 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
private SimpleString filterString;
private final boolean enable1xPrefixes;
// Constructors ---------------------------------------------------------------------------------
protected ActiveMQQueueBrowser(final ConnectionFactoryOptions options,
final ActiveMQQueue queue,
final String messageSelector,
final ClientSession session) throws JMSException {
final ClientSession session,
final boolean enable1xPrefixes) throws JMSException {
this.options = options;
this.session = session;
this.queue = queue;
if (messageSelector != null) {
filterString = new SimpleString(SelectorTranslator.convertToActiveMQFilterString(messageSelector));
}
this.enable1xPrefixes = enable1xPrefixes;
}
// QueueBrowser implementation -------------------------------------------------------------------
@ -138,6 +142,11 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
ClientMessage next = current;
current = null;
msg = ActiveMQMessage.createMessage(next, session, options);
if (enable1xPrefixes) {
msg.setEnable1xPrefixes(true);
}
try {
msg.doBeforeReceive();
} catch (Exception e) {

View File

@ -879,7 +879,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
return new ActiveMQQueueBrowser(options, (ActiveMQQueue) activeMQDestination, filterString, session);
return new ActiveMQQueueBrowser(options, (ActiveMQQueue) activeMQDestination, filterString, session, enable1xPrefixes);
}
@ -1126,6 +1126,10 @@ public class ActiveMQSession implements QueueSession, TopicSession {
consumers.remove(consumer);
}
public boolean isEnable1xPrefixes() {
return enable1xPrefixes;
}
// Package protected ---------------------------------------------
void deleteQueue(final SimpleString queueName) throws JMSException {

View File

@ -82,6 +82,10 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.setClientAcknowledge();
}
if (session.isEnable1xPrefixes()) {
msg.setEnable1xPrefixes(true);
}
try {
msg.doBeforeReceive();
} catch (Exception e) {

View File

@ -1793,6 +1793,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
}
cf.setEnableSharedClientID(true);
cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
setParams(cf, overrideProperties);
return cf;
}
@ -1859,6 +1860,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
cf.setReconnectAttempts(0);
cf.setInitialConnectAttempts(0);
cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
cf.setEnableSharedClientID(true);
return cf;
}

View File

@ -455,6 +455,7 @@ public class ActiveMQActivation {
// to make sure we won't close anyone's connection factory when we stop the MDB
factory = ActiveMQJMSClient.createConnectionFactory(((ActiveMQConnectionFactory) fac).toURI().toString(), "internalConnection");
factory.setEnableSharedClientID(true);
factory.setEnable1xPrefixes(((ActiveMQResourceAdapter) fac).isEnable1xPrefixes());
} else {
factory = ra.newConnectionFactory(spec);
}

View File

@ -676,7 +676,6 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();