ARTEMIS-1416 Implementing cache on queue and address querying
This will cache the last query, optimizing most of the cases This won't optimize the case where you are sending producers with different address, but this is not the one I'm after now.
This commit is contained in:
parent
16d9b191a1
commit
1d1d6c8b46
|
@ -98,6 +98,11 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
|
|
||||||
private final AtomicBoolean draining = new AtomicBoolean(false);
|
private final AtomicBoolean draining = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
|
||||||
|
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
|
||||||
|
|
||||||
|
private final AddressQueryCache<BindingQueryResult> bindingQueryCache = new AddressQueryCache<>();
|
||||||
|
|
||||||
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
|
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
|
||||||
ProtonProtocolManager manager,
|
ProtonProtocolManager manager,
|
||||||
AMQPConnectionContext connection,
|
AMQPConnectionContext connection,
|
||||||
|
@ -277,9 +282,17 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
return queueQueryResult;
|
return queueQueryResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
|
public boolean bindingQuery(String address, RoutingType routingType) throws Exception {
|
||||||
|
BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
|
||||||
|
|
||||||
|
if (bindingQueryResult != null) {
|
||||||
|
return bindingQueryResult.isExists();
|
||||||
|
}
|
||||||
|
|
||||||
SimpleString simpleAddress = SimpleString.toSimpleString(address);
|
SimpleString simpleAddress = SimpleString.toSimpleString(address);
|
||||||
BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
|
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
|
||||||
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
|
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
|
||||||
try {
|
try {
|
||||||
serverSession.createAddress(simpleAddress, routingType, true);
|
serverSession.createAddress(simpleAddress, routingType, true);
|
||||||
|
@ -298,13 +311,22 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
}
|
}
|
||||||
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
|
bindingQueryResult = serverSession.executeBindingQuery(simpleAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bindingQueryCache.setResult(address, bindingQueryResult);
|
||||||
return bindingQueryResult.isExists();
|
return bindingQueryResult.isExists();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public AddressQueryResult addressQuery(String addressName,
|
public AddressQueryResult addressQuery(String addressName,
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
boolean autoCreate) throws Exception {
|
boolean autoCreate) throws Exception {
|
||||||
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
|
|
||||||
|
AddressQueryResult addressQueryResult = addressQueryCache.getResult(addressName);
|
||||||
|
if (addressQueryResult != null) {
|
||||||
|
return addressQueryResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
|
||||||
|
|
||||||
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
|
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
|
||||||
try {
|
try {
|
||||||
|
@ -314,6 +336,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
}
|
}
|
||||||
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
|
addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addressQueryCache.setResult(addressName, addressQueryResult);
|
||||||
return addressQueryResult;
|
return addressQueryResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -685,4 +709,27 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
public void removeProducer(String name) {
|
public void removeProducer(String name) {
|
||||||
serverSession.removeProducer(name);
|
serverSession.removeProducer(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class AddressQueryCache<T> {
|
||||||
|
String address;
|
||||||
|
T result;
|
||||||
|
|
||||||
|
public synchronized T getResult(String parameterAddress) {
|
||||||
|
if (address != null && address.equals(parameterAddress)) {
|
||||||
|
return result;
|
||||||
|
} else {
|
||||||
|
result = null;
|
||||||
|
address = null;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setResult(String parameterAddress, T result) {
|
||||||
|
this.address = parameterAddress;
|
||||||
|
this.result = result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue