ARTEMIS-4801 Fix issue with caching address query results

When caching address query results the remote session can be blocked forever
from creating links on an address if the "does not exist" value is cached
since it is never updated again and will always report "does not exist" even
if the address is added manually via management later. The cache state can
cause other issues for long running sessions as well and should be removed
to avoid attach failures for cases where the current broker state could allow
the attach to succeed but the cached entry won't allow it.
This commit is contained in:
Timothy Bish 2024-06-06 16:50:26 -04:00 committed by clebertsuconic
parent 7ca30e9a63
commit ac1b483cb4
3 changed files with 164 additions and 13 deletions

View File

@ -108,8 +108,6 @@ public class AMQPSessionCallback implements SessionCallback {
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
private ProtonTransactionHandler transactionHandler;
private final RunnableList blockedRunnables = new RunnableList();
@ -409,12 +407,7 @@ public class AMQPSessionCallback implements SessionCallback {
RoutingType routingType,
boolean autoCreate) throws Exception {
AddressQueryResult addressQueryResult = addressQueryCache.getResult(addressName);
if (addressQueryResult != null) {
return addressQueryResult;
}
addressQueryResult = serverSession.executeAddressQuery(addressName);
AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(addressName);
if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) {
try {
@ -422,10 +415,10 @@ public class AMQPSessionCallback implements SessionCallback {
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
addressQueryResult = serverSession.executeAddressQuery(addressName);
}
addressQueryCache.setResult(addressName, addressQueryResult);
return addressQueryResult;
}

View File

@ -55,6 +55,8 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.jgroups.util.UUID;
@ -1256,7 +1258,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test
@Timeout(60)
public void testReceiveRejecting() throws Exception {
@ -1277,8 +1278,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.send(message);
}
Queue queueView = getProxyToQueue(address);
for (int i = 0; i < MSG_COUNT; i++) {
@ -1296,11 +1295,95 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertNull(receiver.receive(1, TimeUnit.MILLISECONDS));
Wait.assertEquals(0, queueView::getDeliveringCount);
connection.close();
}
@Test
@Timeout(60)
public void testCreateTopicReceiverOnAddressThatDoesNotExistOnPreviousAttempt() throws Exception {
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final String address = "test";
final Source source = new Source();
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setCapabilities(Symbol.getSymbol("topic"));
source.setAddress(address);
try {
session.createReceiver(source, "test-receiver-subscription");
fail("Should not be able to create the receiver");
} catch (Exception ex) {
// Expected
}
server.addAddressInfo(new AddressInfo(SimpleString.of(address), RoutingType.MULTICAST));
AmqpReceiver receiver = null;
try {
receiver = session.createReceiver(source, "test-receiver-subscription");
receiver.flow(1);
} catch (Exception ex) {
fail("Should be able to create the receiver");
}
final AmqpSender sender = session.createSender(address);
final AmqpMessage message = new AmqpMessage();
message.setText("TestPayload");
sender.send(message);
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
connection.close();
}
@Test
@Timeout(60)
public void testCreateQueueReceiverOnAddressThenRedoAsTopicReceiverAfterAddressUpdated() throws Exception {
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final String address = "test";
server.addAddressInfo(new AddressInfo(SimpleString.of(address), RoutingType.ANYCAST));
final Source source = new Source();
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setCapabilities(Symbol.getSymbol("topic"));
source.setAddress(address);
try {
session.createReceiver(source, "test-receiver-subscription");
fail("Should not be able to create the receiver");
} catch (Exception ex) {
// Expected
}
server.removeAddressInfo(SimpleString.of(address), null);
server.addAddressInfo(new AddressInfo(SimpleString.of(address), RoutingType.MULTICAST));
AmqpReceiver receiver = null;
try {
receiver = session.createReceiver(source, "test-receiver-subscription");
receiver.flow(1);
} catch (Exception ex) {
fail("Should be able to create the receiver");
}
final AmqpSender sender = session.createSender(address);
final AmqpMessage message = new AmqpMessage();
message.setText("TestPayload");
sender.send(message);
assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
connection.close();
}
}

View File

@ -3575,6 +3575,81 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testBrokerAllowsAttachToPreviouslyNonExistentAddressAfterItIsAdded() throws Exception {
final AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoCreateAddresses(false);
server.getConfiguration().getAddressSettings().put("#", addressSettings);
server.start();
final Map<String, Object> remoteSourceProperties = new HashMap<>();
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withNullSource();
peer.expectDetach().respond();
// Connect to remote as if an queue had demand and matched our federation policy
// and expect a rejected attach as the address does not yet exist and auto create
// has been disabled.
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
server.addAddressInfo(new AddressInfo("test").addRoutingType(RoutingType.MULTICAST));
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Attempt attach again as if new address demand has been added and the policy manager reacts.
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);