ARTEMIS-2023 Support 1x prefixes for JMS dests created using session

In some cases users who migrate from 1.x to 2.x may still want to keep
the legacy prefixes for their JMS destinations (i.e. "jms.queue.",
"jms.topic.", etc.). This commit adds a boolean on our ConnectionFactory
implementation so that it will use the old prefixes when invoking the
queue/topic creation methods on the Session implementation.
This commit is contained in:
Justin Bertram 2018-08-09 21:32:23 -05:00 committed by Clebert Suconic
parent b7acf3a3f2
commit df583922f5
16 changed files with 222 additions and 17 deletions

View File

@ -135,6 +135,8 @@ public final class ActiveMQClient {
public static final boolean DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING = true;
public static final boolean DEFAULT_ENABLE_1X_PREFIXES = false;
public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

View File

@ -35,7 +35,9 @@ public class PacketImpl implements Packet {
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
public static final SimpleString OLD_TEMP_TOPIC_PREFIX = new SimpleString("jms.temptopic.");
// The minimal size for all the packets, Common data for all the packets (look at
// PacketImpl.encode)

View File

@ -129,6 +129,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private final boolean cacheDestinations;
private final boolean enable1xPrefixes;
private ClientSession initialSession;
private final Exception creationStack;
@ -147,6 +149,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final boolean enable1xPrefixes,
final ClientSessionFactory sessionFactory) {
this.options = options;
@ -170,6 +173,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
this.cacheDestinations = cacheDestinations;
this.enable1xPrefixes = enable1xPrefixes;
creationStack = new Exception();
}
@ -661,9 +666,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
ClientSession session,
int type) {
if (isXA) {
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type);
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, enable1xPrefixes, session, type);
} else {
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type);
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, enable1xPrefixes, session, type);
}
}

View File

@ -92,6 +92,8 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
private boolean ignoreJTA;
private boolean enable1xPrefixes = ActiveMQClient.DEFAULT_ENABLE_1X_PREFIXES;
@Override
public void writeExternal(ObjectOutput out) throws IOException {
URI uri = toURI();
@ -477,6 +479,15 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
this.cacheDestinations = cacheDestinations;
}
public synchronized boolean isEnable1xPrefixes() {
return this.enable1xPrefixes;
}
public synchronized void setEnable1xPrefixes(final boolean enable1xPrefixes) {
checkWrite();
this.enable1xPrefixes = enable1xPrefixes;
}
public synchronized long getClientFailureCheckPeriod() {
return serverLocator.getClientFailureCheckPeriod();
}
@ -824,19 +835,19 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
if (isXA) {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
}
} else {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
}
}

View File

@ -236,6 +236,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return createTemporaryTopic(address, session);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final ActiveMQSession session, final String prefix) {
String address = prefix + UUID.randomUUID().toString();
return createTemporaryQueue(address, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(final ActiveMQSession session, final String prefix) {
String address = prefix + UUID.randomUUID().toString();
return createTemporaryTopic(address, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String address, final ActiveMQSession session) {
return new ActiveMQTemporaryTopic(address, session);
}

View File

@ -61,6 +61,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.SelectorTranslator;
@ -101,6 +102,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
private final boolean cacheDestination;
private final boolean enable1xPrefixes;
private final Map<String, Topic> topicCache = new ConcurrentHashMap<>();
private final Map<String, Queue> queueCache = new ConcurrentHashMap<>();
@ -113,6 +116,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
final boolean xa,
final int ackMode,
final boolean cacheDestination,
final boolean enable1xPrefixes,
final ClientSession session,
final int sessionType) {
this.options = options;
@ -130,6 +134,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
this.xa = xa;
this.cacheDestination = cacheDestination;
this.enable1xPrefixes = enable1xPrefixes;
}
// Session implementation ----------------------------------------
@ -885,7 +891,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQTemporaryQueue queue = ActiveMQDestination.createTemporaryQueue(this);
final ActiveMQTemporaryQueue queue;
if (enable1xPrefixes) {
queue = ActiveMQDestination.createTemporaryQueue(this, PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString());
} else {
queue = ActiveMQDestination.createTemporaryQueue(this);
}
SimpleString simpleAddress = queue.getSimpleAddress();
@ -907,7 +918,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQTemporaryTopic topic = ActiveMQDestination.createTemporaryTopic(this);
final ActiveMQTemporaryTopic topic;
if (enable1xPrefixes) {
topic = ActiveMQDestination.createTemporaryTopic(this, PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString());
} else {
topic = ActiveMQDestination.createTemporaryTopic(this);
}
SimpleString simpleAddress = topic.getSimpleAddress();
@ -1133,12 +1149,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
private ActiveMQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException {
String queueNameToUse = queueName;
if (enable1xPrefixes) {
queueNameToUse = (isTemporary ? PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString() : PacketImpl.OLD_QUEUE_PREFIX.toString()) + queueName;
}
ActiveMQQueue queue;
if (isTemporary) {
queue = ActiveMQDestination.createTemporaryQueue(queueName);
queue = ActiveMQDestination.createTemporaryQueue(queueNameToUse);
} else {
queue = ActiveMQDestination.createQueue(queueName);
queue = ActiveMQDestination.createQueue(queueNameToUse);
}
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
@ -1151,13 +1172,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
private ActiveMQTopic lookupTopic(final String topicName, final boolean isTemporary) throws ActiveMQException {
String topicNameToUse = topicName;
if (enable1xPrefixes) {
topicNameToUse = (isTemporary ? PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString() : PacketImpl.OLD_TOPIC_PREFIX.toString()) + topicName;
}
ActiveMQTopic topic;
if (isTemporary) {
topic = ActiveMQDestination.createTemporaryTopic(topicName);
topic = ActiveMQDestination.createTemporaryTopic(topicNameToUse);
} else {
topic = ActiveMQDestination.createTopic(topicName);
topic = ActiveMQDestination.createTopic(topicNameToUse);
}
AddressQuery query = session.addressQuery(topic.getSimpleAddress());

View File

@ -42,8 +42,9 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final boolean enable1xNaming,
final ClientSessionFactory sessionFactory) {
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory);
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xNaming, sessionFactory);
}
@Override

View File

@ -37,8 +37,9 @@ public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession
boolean xa,
int ackMode,
boolean cacheDestinations,
boolean enable1xNaming,
ClientSession session,
int sessionType) {
super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType);
super(options, connection, transacted, xa, ackMode, cacheDestinations, enable1xNaming, session, sessionType);
}
}

View File

@ -189,4 +189,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
int getInitialMessagePacketSize();
ConnectionFactoryConfiguration setInitialMessagePacketSize(int size);
boolean isEnable1xPrefixes();
ConnectionFactoryConfiguration setEnable1xPrefixes(boolean enable1xPrefixes);
}

View File

@ -124,6 +124,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
private boolean enable1xPrefixes = ActiveMQClient.DEFAULT_ENABLE_1X_PREFIXES;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -532,6 +534,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this;
}
@Override
public boolean isEnable1xPrefixes() {
return enable1xPrefixes;
}
@Override
public ConnectionFactoryConfiguration setEnable1xPrefixes(final boolean enable1xPrefixes) {
this.enable1xPrefixes = enable1xPrefixes;
return this;
}
// Encoding Support Implementation --------------------------------------------------------------
@Override
@ -623,6 +636,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
deserializationBlackList = BufferHelper.readNullableSimpleStringAsString(buffer);
deserializationWhiteList = BufferHelper.readNullableSimpleStringAsString(buffer);
enable1xPrefixes = buffer.readableBytes() > 0 ? buffer.readBoolean() : null;
}
@Override
@ -712,6 +727,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.writeAsNullableSimpleString(buffer, deserializationBlackList);
BufferHelper.writeAsNullableSimpleString(buffer, deserializationWhiteList);
buffer.writeBoolean(enable1xPrefixes);
}
@Override
@ -825,7 +842,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.sizeOfNullableSimpleString(deserializationBlackList) +
BufferHelper.sizeOfNullableSimpleString(deserializationWhiteList);
BufferHelper.sizeOfNullableSimpleString(deserializationWhiteList) +
DataConstants.SIZE_BOOLEAN;
// enable1xPrefixes;
return size;
}

View File

@ -1220,6 +1220,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setDeserializationBlackList(cfConfig.getDeserializationBlackList());
cf.setDeserializationWhiteList(cfConfig.getDeserializationWhiteList());
cf.setInitialMessagePacketSize(cfConfig.getInitialMessagePacketSize());
cf.setEnable1xPrefixes(cfConfig.isEnable1xPrefixes());
return cf;
}

View File

@ -592,6 +592,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
mcfProperties.setCacheDestinations(cacheDestinations);
}
public Boolean isEnable1xPrefixes() {
return mcfProperties.isEnable1xPrefixes();
}
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
mcfProperties.setEnable1xPrefixes(enable1xPrefixes);
}
public Integer getScheduledThreadPoolMaxSize() {
return mcfProperties.getScheduledThreadPoolMaxSize();
}

View File

@ -661,6 +661,32 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
return raProperties.isCacheDestinations();
}
/**
* Set enable1xPrefixes
*
* @param enable1xPrefixes The value
*/
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
if (logger.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("setEnable1xPrefixes(" + enable1xPrefixes + ")");
}
raProperties.setEnable1xPrefixes(enable1xPrefixes);
}
/**
* Get isCacheDestinations
*
* @return The value
*/
public Boolean isEnable1xPrefixes() {
if (logger.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("isEnable1xPrefixes()");
}
return raProperties.isEnable1xPrefixes();
}
/**
* Set compressLargeMessage
*

View File

@ -114,6 +114,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
private Boolean cacheDestinations;
private Boolean enable1xPrefixes;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
@ -629,6 +631,21 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
this.cacheDestinations = cacheDestinations;
}
public Boolean isEnable1xPrefixes() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("isEnable1xPrefixes()");
}
return enable1xPrefixes;
}
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("setEnable1xPrefixes(" + enable1xPrefixes + ")");
}
hasBeenUpdated = true;
this.enable1xPrefixes = enable1xPrefixes;
}
public Integer getScheduledThreadPoolMaxSize() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()");
@ -975,6 +992,13 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
return false;
} else if (!deserializationWhiteList.equals(other.deserializationWhiteList))
return false;
if (this.enable1xPrefixes == null) {
if (other.enable1xPrefixes != null)
return false;
} else if (!this.enable1xPrefixes.equals(other.enable1xPrefixes))
return false;
return true;
}
@ -1027,6 +1051,7 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
result = prime * result + ((connectionParameters == null) ? 0 : connectionParameters.hashCode());
result = prime * result + ((deserializationBlackList == null) ? 0 : deserializationBlackList.hashCode());
result = prime * result + ((deserializationWhiteList == null) ? 0 : deserializationWhiteList.hashCode());
result = prime * result + ((enable1xPrefixes == null) ? 0 : enable1xPrefixes.hashCode());
return result;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -664,4 +665,59 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
testContext(ctx, "myConnectionFactory", JMSFactoryType.CF);
}
@Test
public void test1xNaming() throws NamingException, JMSException {
liveService.getSecurityStore().setSecurityEnabled(false);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put("connectionFactory.ConnectionFactory", "vm://0?enable1xPrefixes=true");
props.put("connectionFactory.ConnectionFactory2", "vm://0");
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
assertTrue(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertTrue(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertTrue(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertTrue(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
// test setting programmatically
connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory2");
((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
connection = connectionFactory.createConnection();
session = connection.createSession();
assertTrue(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertTrue(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertTrue(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertTrue(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
}
@Test
public void test1xNamingNegative() throws NamingException, JMSException {
liveService.getSecurityStore().setSecurityEnabled(false);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put("connectionFactory.ConnectionFactory", "vm://0");
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
assertFalse(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertFalse(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertFalse(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertFalse(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
}
}

View File

@ -414,6 +414,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
" <config-property-name>IgnoreJTA</config-property-name>" +
" <config-property-type>boolean</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>" +
" <config-property>" +
" <description>***add***</description>" +
" <config-property-name>Enable1xPrefixes</config-property-name>" +
" <config-property-type>boolean</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>";
private static String rootConfig = "<root>" + config + commentedOutConfigs + "</root>";