From b58191bd5258d7c25e3cce6bf2f6084d801442e8 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 21 Aug 2024 11:45:32 -0400 Subject: [PATCH] ARTEMIS-5004 Clean up federation address consumer bindings proactively When an address consumer explicitly closes or is closed we should remove the address binding for that consumer right away instead of waiting for possible configured auto delete as the demand is gone and we don't need to binding to stick around any longer. --- ...AMQPFederationAddressSenderController.java | 43 +++ .../AMQPFederationBaseSenderController.java | 16 +- .../AMQPFederationAddressPolicyTest.java | 303 ++++++++++++++++++ 3 files changed, 360 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java index 4de20baa51..4309a35f36 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java @@ -26,6 +26,7 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities; +import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -54,9 +55,12 @@ import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link SenderController} used when an AMQP federation Address receiver is created @@ -67,6 +71,10 @@ import org.apache.qpid.proton.engine.Sender; */ public final class AMQPFederationAddressSenderController extends AMQPFederationBaseSenderController { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private ProtonServerSenderContext senderContext; + public AMQPFederationAddressSenderController(AMQPSessionContext session) throws ActiveMQAMQPException { super(session); } @@ -88,6 +96,9 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links."); } + // Store for use during link close + this.senderContext = senderContext; + // Match the settlement mode of the remote instead of relying on the default of MIXED. sender.setSenderSettleMode(sender.getRemoteSenderSettleMode()); // We don't currently support SECOND so enforce that the answer is always FIRST @@ -197,6 +208,38 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false); } + @Override + protected void handleLinkRemotelyClosed() { + // Remote closed indicating there was no demand, so we can cleanup the federation binding + deleteAddressFederationBindingIfPresent(); + } + + @Override + protected void handleLinkLocallyClosed(ErrorCondition error) { + // Local side forcibly removed the federation consumer so we should ensure the binding is removed. + deleteAddressFederationBindingIfPresent(); + } + + private void deleteAddressFederationBindingIfPresent() { + if (senderContext == null) { + return; + } + + try { + final Sender sender = senderContext.getSender(); + final Source source = (Source) sender.getRemoteSource(); + final SimpleString queueName = SimpleString.of(sender.getName()); + final RoutingType routingType = getRoutingType(source); + + final QueueQueryResult queueQuery = sessionSPI.queueQuery(queueName, routingType, false); + if (queueQuery.isExists()) { + sessionSPI.deleteQueue(queueName); + } + } catch (Exception e) { + logger.debug("Federation address sender link closed cleanup caught error: ", e); + } + } + @SuppressWarnings("unchecked") private String getJMSSelectorFromFilters(Source source) throws ActiveMQAMQPException { final Map.Entry jmsSelector = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java index b857c03d58..91302ebb2e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java @@ -79,14 +79,20 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro } @Override - public void close() throws Exception { + public final void close() throws Exception { if (federation != null) { federation.removeLinkClosedInterceptor(controllerId); } + + handleLinkRemotelyClosed(); + } + + protected void handleLinkRemotelyClosed() { + // Default does nothing. } @Override - public void close(ErrorCondition error) { + public final void close(ErrorCondition error) { if (error != null && AmqpError.RESOURCE_DELETED.equals(error.getCondition())) { if (resourceDeletedAction != null) { resourceDeletedAction.accept(error); @@ -96,6 +102,12 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro if (federation != null) { federation.removeLinkClosedInterceptor(controllerId); } + + handleLinkLocallyClosed(error); + } + + protected void handleLinkLocallyClosed(ErrorCondition error) { + // Default does nothing. } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index d34d429604..17f54b5ac2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -96,6 +96,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Divert; +import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -3882,6 +3883,308 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteConnectionSuddenDropLeaveAddressBindingIntact() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().withMessage().withHeader().also() + .withMessageAnnotations().also() + .withProperties().also() + .withValue("one").and() + .accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createTextMessage("one")); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + // Unexpected connection drop should leave durable federation address subscription in place. + Wait.assertTrue(() -> server.getConnectionCount() == 0); + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + } + + // Send a message to check that the federation binding holds onto sends while the remote is offline + // due to connectivity issues. + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createTextMessage("two")); + } + + // Reconnect again as if the remote has recovered from the unexpected connection drop + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there was still a binding from the previous federation whose connection dropped + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + peer.expectTransfer().withMessage().withHeader().also() + .withMessageAnnotations().also() + .withProperties().also() + .withValue("two").and() + .accept(); + + // Connect again to remote as if local demand still matches our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testFederationAddressBindingCleanedUpAfterConnectionDroppedIfConfiguredTo() throws Exception { + doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(true); + } + + @Test + @Timeout(20) + public void testFederationAddressBindingNotCleanedUpAfterConnectionDroppedIfConfiguredNotTo() throws Exception { + doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(false); + } + + private void doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(boolean autoDelete) throws Exception { + server.getConfiguration().setAddressQueueScanPeriod(100); + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + if (autoDelete) { + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 200L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + } else { + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + } + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 5_000, 500); + + final SimpleString binding = server.bindingQuery(SimpleString.of("test")).getQueueNames().get(0); + assertNotNull(binding); + assertTrue(binding.startsWith(SimpleString.of("federation"))); + + final QueueQueryResult federationBinding = server.queueQuery(binding); + if (autoDelete) { + assertTrue(federationBinding.isAutoDelete()); + assertEquals(200, federationBinding.getAutoDeleteDelay()); + assertEquals(-1, federationBinding.getAutoDeleteMessageCount()); + } else { + assertFalse(federationBinding.isAutoDelete()); + assertEquals(-1, federationBinding.getAutoDeleteDelay()); + assertEquals(-1, federationBinding.getAutoDeleteMessageCount()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + if (autoDelete) { + // Queue binding should eventually be auto deleted based on configuration + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0, 5_000, 100); + } else { + // Should still be there as it wasn't marked as auto delete as previously validated. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 1_000, 100); + } + + server.stop(); + } + } + private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address);