This commit is contained in:
Clebert Suconic 2017-05-24 18:45:16 -04:00
commit 5d829e5135
13 changed files with 159 additions and 13 deletions

View File

@ -126,6 +126,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
private final int transactionBatchSize;
private final boolean cacheDestinations;
private ClientSession initialSession;
private final Exception creationStack;
@ -143,6 +145,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
final String clientID,
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final ClientSessionFactory sessionFactory) {
this.options = options;
@ -164,6 +167,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
this.transactionBatchSize = transactionBatchSize;
this.cacheDestinations = cacheDestinations;
creationStack = new Exception();
}
@ -654,9 +659,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
ClientSession session,
int type) {
if (isXA) {
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, session, type);
return new ActiveMQXASession(options, this, transacted, true, acknowledgeMode, cacheDestinations, session, type);
} else {
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, session, type);
return new ActiveMQSession(options, this, transacted, false, acknowledgeMode, cacheDestinations, session, type);
}
}

View File

@ -84,6 +84,8 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
private String deserializationWhiteList;
private boolean cacheDestinations;
private boolean finalizeChecks;
@Override
@ -428,6 +430,15 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
this.transactionBatchSize = transactionBatchSize;
}
public synchronized boolean isCacheDestinations() {
return this.cacheDestinations;
}
public synchronized void setCacheDestinations(final boolean cacheDestinations) {
checkWrite();
this.cacheDestinations = cacheDestinations;
}
public synchronized long getClientFailureCheckPeriod() {
return serverLocator.getClientFailureCheckPeriod();
}
@ -766,19 +777,19 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
if (isXA) {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQXAConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
}
} else {
if (type == ActiveMQConnection.TYPE_GENERIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
} else if (type == ActiveMQConnection.TYPE_QUEUE_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
} else if (type == ActiveMQConnection.TYPE_TOPIC_CONNECTION) {
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, factory);
connection = new ActiveMQConnection(this, username, password, type, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, factory);
}
}

View File

@ -45,8 +45,10 @@ import javax.jms.TransactionInProgressException;
import javax.transaction.xa.XAResource;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@ -95,6 +97,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
private final Set<ActiveMQMessageConsumer> consumers = new HashSet<>();
private final boolean cacheDestination;
private final Map<String, Topic> topicCache = new ConcurrentHashMap<>();
private final Map<String, Queue> queueCache = new ConcurrentHashMap<>();
// Constructors --------------------------------------------------
protected ActiveMQSession(final ConnectionFactoryOptions options,
@ -102,6 +110,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
final boolean transacted,
final boolean xa,
final int ackMode,
final boolean cacheDestination,
final ClientSession session,
final int sessionType) {
this.options = options;
@ -117,6 +126,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
this.transacted = transacted;
this.xa = xa;
this.cacheDestination = cacheDestination;
}
// Session implementation ----------------------------------------
@ -255,6 +266,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
}
topicCache.clear();
queueCache.clear();
}
@Override
@ -367,7 +380,17 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
try {
return internalCreateQueue(queueName, false);
Queue queue = null;
if (cacheDestination) {
queue = queueCache.get(queueName);
}
if (queue == null) {
queue = internalCreateQueue(queueName, false);
}
if (cacheDestination) {
queueCache.put(queueName, queue);
}
return queue;
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
@ -396,9 +419,18 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) {
throw new IllegalStateException("Cannot create a topic on a QueueSession");
}
try {
return internalCreateTopic(topicName, false);
Topic topic = null;
if (cacheDestination) {
topic = topicCache.get(topicName);
}
if (topic == null) {
topic = internalCreateTopic(topicName, false);
}
if (cacheDestination) {
topicCache.put(topicName, topic);
}
return topic;
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}

View File

@ -41,8 +41,9 @@ public final class ActiveMQXAConnection extends ActiveMQConnection implements XA
final String clientID,
final int dupsOKBatchSize,
final int transactionBatchSize,
final boolean cacheDestinations,
final ClientSessionFactory sessionFactory) {
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory);
super(options, username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, cacheDestinations, sessionFactory);
}
@Override

View File

@ -36,8 +36,9 @@ public class ActiveMQXASession extends ActiveMQSession implements XAQueueSession
boolean transacted,
boolean xa,
int ackMode,
boolean cacheDestinations,
ClientSession session,
int sessionType) {
super(options, connection, transacted, xa, ackMode, session, sessionType);
super(options, connection, transacted, xa, ackMode, cacheDestinations, session, sessionType);
}
}

View File

@ -400,6 +400,18 @@ public class ConnectionFactoryURITest {
checkEquals(bean, connectionFactoryWithHA, factory);
}
@Test
public void testCacheDestinations() throws Exception {
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030"), null);
Assert.assertFalse(factory.isCacheDestinations());
factory = parser.newObject(new URI("tcp://localhost:3030?cacheDestinations=true"), null);
Assert.assertTrue(factory.isCacheDestinations());
}
private void populate(StringBuilder sb,
BeanUtilsBean bean,
ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException {

View File

@ -580,6 +580,14 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
mcfProperties.setUseGlobalPools(useGlobalPools);
}
public Boolean isCacheDestinations() {
return mcfProperties.isCacheDestinations();
}
public void setCacheDestinations(final Boolean cacheDestinations) {
mcfProperties.setCacheDestinations(cacheDestinations);
}
public Integer getScheduledThreadPoolMaxSize() {
return mcfProperties.getScheduledThreadPoolMaxSize();
}

View File

@ -634,6 +634,32 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
return raProperties.isFailoverOnInitialConnection();
}
/**
* Set cacheDestinations
*
* @param cacheDestinations The value
*/
public void setCacheDestinations(final Boolean cacheDestinations) {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")");
}
raProperties.setCacheDestinations(cacheDestinations);
}
/**
* Get isCacheDestinations
*
* @return The value
*/
public Boolean isCacheDestinations() {
if (ActiveMQResourceAdapter.trace) {
ActiveMQRALogger.LOGGER.trace("isCacheDestinations()");
}
return raProperties.isCacheDestinations();
}
/**
* Set compressLargeMessage
*
@ -1911,6 +1937,11 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
cf.setFailoverOnInitialConnection(val);
}
val = overrideProperties.isCacheDestinations() != null ? overrideProperties.isCacheDestinations() : raProperties.isCacheDestinations();
if (val != null) {
cf.setCacheDestinations(val);
}
Integer val2 = overrideProperties.getConsumerMaxRate() != null ? overrideProperties.getConsumerMaxRate() : raProperties.getConsumerMaxRate();
if (val2 != null) {
cf.setConsumerMaxRate(val2);

View File

@ -112,6 +112,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
private Boolean useGlobalPools;
private Boolean cacheDestinations;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
@ -612,6 +614,21 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {
this.useGlobalPools = useGlobalPools;
}
public Boolean isCacheDestinations() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("isCacheDestinations()");
}
return cacheDestinations;
}
public void setCacheDestinations(final Boolean cacheDestinations) {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("setCacheDestinations(" + cacheDestinations + ")");
}
hasBeenUpdated = true;
this.cacheDestinations = cacheDestinations;
}
public Integer getScheduledThreadPoolMaxSize() {
if (ConnectionFactoryProperties.trace) {
ActiveMQRALogger.LOGGER.trace("getScheduledThreadPoolMaxSize()");

View File

@ -151,6 +151,11 @@ tuning:
the wire, so if you re-use `SimpleString` instances between calls
then you can avoid some unnecessary copying.
- If using frameworks like Spring, configure destinations permanently broker side
and enable `destinationCache` on the client side.
See the [Setting The Destination Cache](using-jms.md)
for more information on this.
## Tuning Transport Settings
- TCP buffer sizes. If you have a fast network and fast machines you

View File

@ -377,3 +377,14 @@ consumer to send acknowledgements in batches rather than individually
saving valuable bandwidth. This can be configured on the connection
factory via the `transactionBatchSize` element and is set in bytes.
The default is 1024 \* 1024.
### Setting The Destination Cache
Many frameworks such as Spring resolve the destination by name on every operation,
this can cause a performance issue and extra calls to the broker,
in a scenario where destinations (addresses) are permanent broker side,
such as they are managed by a platform or operations team.
using `destinationCache` element, you can toggle on the destination cache
to improve the performance and reduce the calls to the broker.
This should not be used if destinations (addresses) are not permanent broker side,
as in dynamic creation/deletion.

View File

@ -258,6 +258,12 @@
<config-property-name>PasswordCodec</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;key=clusterpassword;algorithm=something</config-property-value>
</config-property>
<config-property>
<description>Cache destinations per session</description>
<config-property-name>CacheDestinations</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
<config-property-value>false</config-property-value>
</config-property>-->
<outbound-resourceadapter>

View File

@ -253,6 +253,12 @@ public class ActiveMQResourceAdapterConfigTest extends ActiveMQTestBase {
" <config-property-type>boolean</config-property-type>\n" +
" <config-property-value></config-property-value>\n" +
" </config-property>\n" +
" <config-property>" +
" <description>Cache destinations per session</description>" +
" <config-property-name>CacheDestinations</config-property-name>" +
" <config-property-type>boolean</config-property-type>" +
" <config-property-value></config-property-value>" +
" </config-property>" +
" <config-property>\n" +
" <description>max number of threads for scheduled thread pool</description>\n" +
" <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>\n" +