From 1d1d6c8b4686f869df0ca5fc09c20128f8481cff Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 21 Nov 2017 10:12:20 -0500 Subject: [PATCH] ARTEMIS-1416 Implementing cache on queue and address querying This will cache the last query, optimizing most of the cases This won't optimize the case where you are sending producers with different address, but this is not the one I'm after now. --- .../amqp/broker/AMQPSessionCallback.java | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 42e9625337..14e13b11ed 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -98,6 +98,11 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); + + private final AddressQueryCache addressQueryCache = new AddressQueryCache<>(); + + private final AddressQueryCache bindingQueryCache = new AddressQueryCache<>(); + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -277,9 +282,17 @@ public class AMQPSessionCallback implements SessionCallback { return queueQueryResult; } + + public boolean bindingQuery(String address, RoutingType routingType) throws Exception { + BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address); + + if (bindingQueryResult != null) { + return bindingQueryResult.isExists(); + } + SimpleString simpleAddress = SimpleString.toSimpleString(address); - BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); + bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) { try { serverSession.createAddress(simpleAddress, routingType, true); @@ -298,13 +311,22 @@ public class AMQPSessionCallback implements SessionCallback { } bindingQueryResult = serverSession.executeBindingQuery(simpleAddress); } + + bindingQueryCache.setResult(address, bindingQueryResult); return bindingQueryResult.isExists(); } + public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception { - AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + + AddressQueryResult addressQueryResult = addressQueryCache.getResult(addressName); + if (addressQueryResult != null) { + return addressQueryResult; + } + + addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { try { @@ -314,6 +336,8 @@ public class AMQPSessionCallback implements SessionCallback { } addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); } + + addressQueryCache.setResult(addressName, addressQueryResult); return addressQueryResult; } @@ -685,4 +709,27 @@ public class AMQPSessionCallback implements SessionCallback { public void removeProducer(String name) { serverSession.removeProducer(name); } + + + class AddressQueryCache { + String address; + T result; + + public synchronized T getResult(String parameterAddress) { + if (address != null && address.equals(parameterAddress)) { + return result; + } else { + result = null; + address = null; + return null; + } + } + + public synchronized void setResult(String parameterAddress, T result) { + this.address = parameterAddress; + this.result = result; + } + + } + }