ARTEMIS-1616 OpenWire improvements

Added existing queues cache to avoid multiple expensive AMQSession::checkAutoCreateQueue calls
This commit is contained in:
Francesco Nigro 2018-01-16 21:27:22 +01:00 committed by Clebert Suconic
parent 051a3cae49
commit 17c0a331ac
1 changed files with 34 additions and 3 deletions

View File

@ -89,6 +89,8 @@ public class AMQSession implements SessionCallback {
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private String[] existingQueuesCache;
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@ -105,6 +107,7 @@ public class AMQSession implements SessionCallback {
this.converter = new OpenWireMessageConverter(marshaller.copy());
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
this.existingQueuesCache = null;
}
public boolean isClosed() {
@ -193,6 +196,33 @@ public class AMQSession implements SessionCallback {
return consumersList;
}
private boolean checkCachedExistingQueues(final SimpleString address,
final String physicalName,
final boolean isTemporary) throws Exception {
String[] existingQueuesCache = this.existingQueuesCache;
//lazy allocation of the cache
if (existingQueuesCache == null) {
//16 means 64 bytes with 32 bit references or 128 bytes with 64 bit references -> 1 or 2 cache lines with common archs
existingQueuesCache = new String[16];
assert (Integer.bitCount(existingQueuesCache.length) == 1) : "existingQueuesCache.length must be power of 2";
this.existingQueuesCache = existingQueuesCache;
}
final int hashCode = physicalName.hashCode();
//this.existingQueuesCache.length must be power of 2
final int mask = existingQueuesCache.length - 1;
final int index = hashCode & mask;
final String existingQueue = existingQueuesCache[index];
if (existingQueue != null && existingQueue.equals(physicalName)) {
//if the information is stale (ie no longer valid) it will fail later
return true;
}
final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
if (hasQueue) {
existingQueuesCache[index] = physicalName;
}
return hasQueue;
}
private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
boolean hasQueue = true;
if (!connection.containsKnownDestination(queueName)) {
@ -350,7 +380,7 @@ public class AMQSession implements SessionCallback {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
@ -361,13 +391,14 @@ public class AMQSession implements SessionCallback {
for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
final ActiveMQDestination dest = actualDestinations[i];
final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
final String physicalName = dest.getPhysicalName();
final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);
if (dest.isQueue()) {
checkAutoCreateQueue(address, dest.isTemporary());
checkCachedExistingQueues(address, physicalName, dest.isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);