ARTEMIS-2072 eliminate unnecessary binding queries
This commit is contained in:
parent
d0272e65de
commit
d91da412c3
|
@ -38,7 +38,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
@ -109,8 +108,6 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
|
||||
|
||||
private final AddressQueryCache<BindingQueryResult> bindingQueryCache = new AddressQueryCache<>();
|
||||
|
||||
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
|
||||
ProtonProtocolManager manager,
|
||||
AMQPConnectionContext connection,
|
||||
|
@ -308,35 +305,34 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
|
||||
|
||||
public boolean bindingQuery(SimpleString address, RoutingType routingType) throws Exception {
|
||||
BindingQueryResult bindingQueryResult = bindingQueryCache.getResult(address);
|
||||
public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception {
|
||||
AddressInfo addressInfo = manager.getServer().getAddressInfo(address);
|
||||
|
||||
if (bindingQueryResult != null) {
|
||||
return bindingQueryResult.isExists();
|
||||
// if the address exists go ahead and return
|
||||
if (addressInfo != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bindingQueryResult = serverSession.executeBindingQuery(address);
|
||||
if (routingType == RoutingType.MULTICAST && !bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateAddresses()) {
|
||||
try {
|
||||
serverSession.createAddress(address, routingType, true);
|
||||
} catch (ActiveMQAddressExistsException e) {
|
||||
// The address may have been created by another thread in the mean time. Catch and do nothing.
|
||||
// if the address and/or queue don't exist then create them if possible
|
||||
if (routingType == RoutingType.MULTICAST && addressInfo == null) {
|
||||
if (manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses()) {
|
||||
try {
|
||||
serverSession.createAddress(address, routingType, true);
|
||||
} catch (ActiveMQAddressExistsException e) {
|
||||
// The address may have been created by another thread in the mean time. Catch and do nothing.
|
||||
}
|
||||
}
|
||||
bindingQueryResult = serverSession.executeBindingQuery(address);
|
||||
} else if (routingType == RoutingType.ANYCAST && bindingQueryResult.isAutoCreateQueues()) {
|
||||
QueueQueryResult queueBinding = serverSession.executeQueueQuery(address);
|
||||
if (!queueBinding.isExists()) {
|
||||
} else if (routingType == RoutingType.ANYCAST && manager.getServer().locateQueue(address) == null) {
|
||||
if (manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateQueues()) {
|
||||
try {
|
||||
serverSession.createQueue(address, address, routingType, null, false, true, true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue may have been created by another thread in the mean time. Catch and do nothing.
|
||||
}
|
||||
}
|
||||
bindingQueryResult = serverSession.executeBindingQuery(address);
|
||||
}
|
||||
|
||||
bindingQueryCache.setResult(address, bindingQueryResult);
|
||||
return bindingQueryResult.isExists();
|
||||
return manager.getServer().getAddressInfo(address) != null;
|
||||
}
|
||||
|
||||
|
||||
|
@ -475,7 +471,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
//here check queue-autocreation
|
||||
RoutingType routingType = context.getRoutingType(receiver, address);
|
||||
if (!bindingQuery(address, routingType)) {
|
||||
if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
|
||||
}
|
||||
|
||||
|
|
|
@ -127,7 +127,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
if (address != null && !address.isEmpty()) {
|
||||
defRoutingType = getRoutingType(target.getCapabilities(), address);
|
||||
try {
|
||||
if (!sessionSPI.bindingQuery(address, defRoutingType)) {
|
||||
if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, defRoutingType)) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
|
||||
}
|
||||
} catch (ActiveMQAMQPNotFoundException e) {
|
||||
|
|
Loading…
Reference in New Issue