From 17c0a331aceda16460cc4d87f376c8730e479fe5 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 16 Jan 2018 21:27:22 +0100 Subject: [PATCH] ARTEMIS-1616 OpenWire improvements Added existing queues cache to avoid multiple expensive AMQSession::checkAutoCreateQueue calls --- .../protocol/openwire/amq/AMQSession.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 3ff3ae1b37..ad15fe72f0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -89,6 +89,8 @@ public class AMQSession implements SessionCallback { private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + private String[] existingQueuesCache; + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -105,6 +107,7 @@ public class AMQSession implements SessionCallback { this.converter = new OpenWireMessageConverter(marshaller.copy()); this.enableAutoReadAndTtl = this::enableAutoReadAndTtl; + this.existingQueuesCache = null; } public boolean isClosed() { @@ -193,6 +196,33 @@ public class AMQSession implements SessionCallback { return consumersList; } + private boolean checkCachedExistingQueues(final SimpleString address, + final String physicalName, + final boolean isTemporary) throws Exception { + String[] existingQueuesCache = this.existingQueuesCache; + //lazy allocation of the cache + if (existingQueuesCache == null) { + //16 means 64 bytes with 32 bit references or 128 bytes with 64 bit references -> 1 or 2 cache lines with common archs + existingQueuesCache = new String[16]; + assert (Integer.bitCount(existingQueuesCache.length) == 1) : "existingQueuesCache.length must be power of 2"; + this.existingQueuesCache = existingQueuesCache; + } + final int hashCode = physicalName.hashCode(); + //this.existingQueuesCache.length must be power of 2 + final int mask = existingQueuesCache.length - 1; + final int index = hashCode & mask; + final String existingQueue = existingQueuesCache[index]; + if (existingQueue != null && existingQueue.equals(physicalName)) { + //if the information is stale (ie no longer valid) it will fail later + return true; + } + final boolean hasQueue = checkAutoCreateQueue(address, isTemporary); + if (hasQueue) { + existingQueuesCache[index] = physicalName; + } + return hasQueue; + } + private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception { boolean hasQueue = true; if (!connection.containsKnownDestination(queueName)) { @@ -350,7 +380,7 @@ public class AMQSession implements SessionCallback { originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); } - boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired(); + final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired(); final AtomicInteger count = new AtomicInteger(actualDestinations.length); @@ -361,13 +391,14 @@ public class AMQSession implements SessionCallback { for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) { final ActiveMQDestination dest = actualDestinations[i]; - final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()); + final String physicalName = dest.getPhysicalName(); + final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool()); //the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1 final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy(); coreMsg.setAddress(address); if (dest.isQueue()) { - checkAutoCreateQueue(address, dest.isTemporary()); + checkCachedExistingQueues(address, physicalName, dest.isTemporary()); coreMsg.setRoutingType(RoutingType.ANYCAST); } else { coreMsg.setRoutingType(RoutingType.MULTICAST);