This commit is contained in:
Clebert Suconic 2018-08-13 12:01:51 -04:00
commit ca9644c36e
16 changed files with 222 additions and 17 deletions

View File

@ -135,6 +135,8 @@ public final class ActiveMQClient {
public static final boolean DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING = true;
public static final boolean DEFAULT_ENABLE_1X_PREFIXES = false;
public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

View File

@ -35,7 +35,9 @@ public class PacketImpl implements Packet {
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
public static final SimpleString OLD_TEMP_TOPIC_PREFIX = new SimpleString("jms.temptopic.");
// The minimal size for all the packets, Common data for all the packets (look at
// PacketImpl.encode)

View File

@ -129,6 +129,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private final boolean cacheDestinations;
private final boolean enable1xPrefixes;
private ClientSession initialSession;
private final Exception creationStack;
@ -147,6 +149,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final boolean enable1xPrefixes,
final ClientSessionFactory sessionFactory) {
this.options = options;
@ -170,6 +173,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
this.cacheDestinations = cacheDestinations;
this.enable1xPrefixes = enable1xPrefixes;
creationStack = new Exception();
}
@ -661,9 +666,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
ClientSession session,
int type) {
if (isXA) {
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type);
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, enable1xPrefixes, session, type);
} else {
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type);
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, enable1xPrefixes, session, type);
}
}

View File

@ -92,6 +92,8 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
private boolean ignoreJTA;
private boolean enable1xPrefixes = ActiveMQClient.DEFAULT_ENABLE_1X_PREFIXES;
@Override
public void writeExternal(ObjectOutput out) throws IOException {
URI uri = toURI();
@ -477,6 +479,15 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
this.cacheDestinations = cacheDestinations;
}
public synchronized boolean isEnable1xPrefixes() {
return this.enable1xPrefixes;
}
public synchronized void setEnable1xPrefixes(final boolean enable1xPrefixes) {
checkWrite();
this.enable1xPrefixes = enable1xPrefixes;
}
public synchronized long getClientFailureCheckPeriod() {
return serverLocator.getClientFailureCheckPeriod();
}
@ -824,19 +835,19 @@ public class ActiveMQConnectionFactory extends JNDIStorable implements Connectio
if (isXA) {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
}
} else {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xPrefixes, factory);
}
}

View File

@ -236,6 +236,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return createTemporaryTopic(address, session);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final ActiveMQSession session, final String prefix) {
String address = prefix + UUID.randomUUID().toString();
return createTemporaryQueue(address, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(final ActiveMQSession session, final String prefix) {
String address = prefix + UUID.randomUUID().toString();
return createTemporaryTopic(address, session);
}
public static ActiveMQTemporaryTopic createTemporaryTopic(String address, final ActiveMQSession session) {
return new ActiveMQTemporaryTopic(address, session);
}

View File

@ -61,6 +61,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.SelectorTranslator;
@ -101,6 +102,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
private final boolean cacheDestination;
private final boolean enable1xPrefixes;
private final Map<String, Topic> topicCache = new ConcurrentHashMap<>();
private final Map<String, Queue> queueCache = new ConcurrentHashMap<>();
@ -113,6 +116,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
final boolean xa,
final int ackMode,
final boolean cacheDestination,
final boolean enable1xPrefixes,
final ClientSession session,
final int sessionType) {
this.options = options;
@ -130,6 +134,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
this.xa = xa;
this.cacheDestination = cacheDestination;
this.enable1xPrefixes = enable1xPrefixes;
}
// Session implementation ----------------------------------------
@ -885,7 +891,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQTemporaryQueue queue = ActiveMQDestination.createTemporaryQueue(this);
final ActiveMQTemporaryQueue queue;
if (enable1xPrefixes) {
queue = ActiveMQDestination.createTemporaryQueue(this, PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString());
} else {
queue = ActiveMQDestination.createTemporaryQueue(this);
}
SimpleString simpleAddress = queue.getSimpleAddress();
@ -907,7 +918,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
ActiveMQTemporaryTopic topic = ActiveMQDestination.createTemporaryTopic(this);
final ActiveMQTemporaryTopic topic;
if (enable1xPrefixes) {
topic = ActiveMQDestination.createTemporaryTopic(this, PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString());
} else {
topic = ActiveMQDestination.createTemporaryTopic(this);
}
SimpleString simpleAddress = topic.getSimpleAddress();
@ -1133,12 +1149,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
private ActiveMQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException {
String queueNameToUse = queueName;
if (enable1xPrefixes) {
queueNameToUse = (isTemporary ? PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString() : PacketImpl.OLD_QUEUE_PREFIX.toString()) + queueName;
}
ActiveMQQueue queue;
if (isTemporary) {
queue = ActiveMQDestination.createTemporaryQueue(queueName);
queue = ActiveMQDestination.createTemporaryQueue(queueNameToUse);
} else {
queue = ActiveMQDestination.createQueue(queueName);
queue = ActiveMQDestination.createQueue(queueNameToUse);
}
QueueQuery response = session.queueQuery(queue.getSimpleAddress());
@ -1151,13 +1172,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
private ActiveMQTopic lookupTopic(final String topicName, final boolean isTemporary) throws ActiveMQException {
String topicNameToUse = topicName;
if (enable1xPrefixes) {
topicNameToUse = (isTemporary ? PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString() : PacketImpl.OLD_TOPIC_PREFIX.toString()) + topicName;
}
ActiveMQTopic topic;
if (isTemporary) {
topic = ActiveMQDestination.createTemporaryTopic(topicName);
topic = ActiveMQDestination.createTemporaryTopic(topicNameToUse);
} else {
topic = ActiveMQDestination.createTopic(topicName);
topic = ActiveMQDestination.createTopic(topicNameToUse);
}
AddressQuery query = session.addressQuery(topic.getSimpleAddress());

View File

@ -42,8 +42,9 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final boolean enable1xNaming,
final ClientSessionFactory sessionFactory) {
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory);
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, enable1xNaming, sessionFactory);
}
@Override

View File

@ -37,8 +37,9 @@ public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession
boolean xa,
int ackMode,
boolean cacheDestinations,
boolean enable1xNaming,
ClientSession session,
int sessionType) {
super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType);
super(options, connection, transacted, xa, ackMode, cacheDestinations, enable1xNaming, session, sessionType);
}
}

View File

@ -189,4 +189,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
int getInitialMessagePacketSize();
ConnectionFactoryConfiguration setInitialMessagePacketSize(int size);
boolean isEnable1xPrefixes();
ConnectionFactoryConfiguration setEnable1xPrefixes(boolean enable1xPrefixes);
}

View File

@ -124,6 +124,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
private boolean enable1xPrefixes = ActiveMQClient.DEFAULT_ENABLE_1X_PREFIXES;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -532,6 +534,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return this;
}
@Override
public boolean isEnable1xPrefixes() {
return enable1xPrefixes;
}
@Override
public ConnectionFactoryConfiguration setEnable1xPrefixes(final boolean enable1xPrefixes) {
this.enable1xPrefixes = enable1xPrefixes;
return this;
}
// Encoding Support Implementation --------------------------------------------------------------
@Override
@ -623,6 +636,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
deserializationBlackList = BufferHelper.readNullableSimpleStringAsString(buffer);
deserializationWhiteList = BufferHelper.readNullableSimpleStringAsString(buffer);
enable1xPrefixes = buffer.readableBytes() > 0 ? buffer.readBoolean() : null;
}
@Override
@ -712,6 +727,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.writeAsNullableSimpleString(buffer, deserializationBlackList);
BufferHelper.writeAsNullableSimpleString(buffer, deserializationWhiteList);
buffer.writeBoolean(enable1xPrefixes);
}
@Override
@ -825,7 +842,10 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
BufferHelper.sizeOfNullableSimpleString(deserializationBlackList) +
BufferHelper.sizeOfNullableSimpleString(deserializationWhiteList);
BufferHelper.sizeOfNullableSimpleString(deserializationWhiteList) +
DataConstants.SIZE_BOOLEAN;
// enable1xPrefixes;
return size;
}

View File

@ -1220,6 +1220,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setDeserializationBlackList(cfConfig.getDeserializationBlackList());
cf.setDeserializationWhiteList(cfConfig.getDeserializationWhiteList());
cf.setInitialMessagePacketSize(cfConfig.getInitialMessagePacketSize());
cf.setEnable1xPrefixes(cfConfig.isEnable1xPrefixes());
return cf;
}

View File

@ -592,6 +592,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
mcfProperties.setCacheDestinations(cacheDestinations);
}
public Boolean isEnable1xPrefixes() {
return mcfProperties.isEnable1xPrefixes();
}
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
mcfProperties.setEnable1xPrefixes(enable1xPrefixes);
}
public Integer getScheduledThreadPoolMaxSize() {
return mcfProperties.getScheduledThreadPoolMaxSize();
}

View File

@ -661,6 +661,32 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
return raProperties.isCacheDestinations();
}
/**
* Set enable1xPrefixes
*
* @param enable1xPrefixes The value
*/
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
if (logger.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("setEnable1xPrefixes(" + enable1xPrefixes + ")");
}
raProperties.setEnable1xPrefixes(enable1xPrefixes);
}
/**
* Get isCacheDestinations
*
* @return The value
*/
public Boolean isEnable1xPrefixes() {
if (logger.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("isEnable1xPrefixes()");
}
return raProperties.isEnable1xPrefixes();
}
/**
* Set compressLargeMessage
*

View File

@ -114,6 +114,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
private Boolean cacheDestinations;
private Boolean enable1xPrefixes;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
@ -629,6 +631,21 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
this.cacheDestinations = cacheDestinations;
}
public Boolean isEnable1xPrefixes() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("isEnable1xPrefixes()");
}
return enable1xPrefixes;
}
public void setEnable1xPrefixes(final Boolean enable1xPrefixes) {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("setEnable1xPrefixes(" + enable1xPrefixes + ")");
}
hasBeenUpdated = true;
this.enable1xPrefixes = enable1xPrefixes;
}
public Integer getScheduledThreadPoolMaxSize() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()");
@ -975,6 +992,13 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
return false;
} else if (!deserializationWhiteList.equals(other.deserializationWhiteList))
return false;
if (this.enable1xPrefixes == null) {
if (other.enable1xPrefixes != null)
return false;
} else if (!this.enable1xPrefixes.equals(other.enable1xPrefixes))
return false;
return true;
}
@ -1027,6 +1051,7 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
result = prime * result + ((connectionParameters == null) ? 0 : connectionParameters.hashCode());
result = prime * result + ((deserializationBlackList == null) ? 0 : deserializationBlackList.hashCode());
result = prime * result + ((deserializationWhiteList == null) ? 0 : deserializationWhiteList.hashCode());
result = prime * result + ((enable1xPrefixes == null) ? 0 : enable1xPrefixes.hashCode());
return result;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -664,4 +665,59 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
testContext(ctx, "myConnectionFactory", JMSFactoryType.CF);
}
@Test
public void test1xNaming() throws NamingException, JMSException {
liveService.getSecurityStore().setSecurityEnabled(false);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put("connectionFactory.ConnectionFactory", "vm://0?enable1xPrefixes=true");
props.put("connectionFactory.ConnectionFactory2", "vm://0");
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
assertTrue(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertTrue(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertTrue(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertTrue(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
// test setting programmatically
connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory2");
((ActiveMQConnectionFactory)connectionFactory).setEnable1xPrefixes(true);
connection = connectionFactory.createConnection();
session = connection.createSession();
assertTrue(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertTrue(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertTrue(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertTrue(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
}
@Test
public void test1xNamingNegative() throws NamingException, JMSException {
liveService.getSecurityStore().setSecurityEnabled(false);
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put("connectionFactory.ConnectionFactory", "vm://0");
Context ctx = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
assertFalse(session.createQueue("testQueue").getQueueName().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString()));
assertFalse(session.createTemporaryQueue().getQueueName().startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString()));
assertFalse(session.createTopic("testTopic").getTopicName().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString()));
assertFalse(session.createTemporaryTopic().getTopicName().startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString()));
connection.close();
}
}

View File

@ -414,6 +414,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
" <config-property-name>IgnoreJTA</config-property-name>" +
" <config-property-type>boolean</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>" +
" <config-property>" +
" <description>***add***</description>" +
" <config-property-name>Enable1xPrefixes</config-property-name>" +
" <config-property-type>boolean</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>";
private static String rootConfig = "<root>" + config + commentedOutConfigs + "</root>";