diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java index 4de20baa51..4309a35f36 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java @@ -26,6 +26,7 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities; +import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -54,9 +55,12 @@ import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link SenderController} used when an AMQP federation Address receiver is created @@ -67,6 +71,10 @@ import org.apache.qpid.proton.engine.Sender; */ public final class AMQPFederationAddressSenderController extends AMQPFederationBaseSenderController { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private ProtonServerSenderContext senderContext; + public AMQPFederationAddressSenderController(AMQPSessionContext session) throws ActiveMQAMQPException { super(session); } @@ -88,6 +96,9 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links."); } + // Store for use during link close + this.senderContext = senderContext; + // Match the settlement mode of the remote instead of relying on the default of MIXED. sender.setSenderSettleMode(sender.getRemoteSenderSettleMode()); // We don't currently support SECOND so enforce that the answer is always FIRST @@ -197,6 +208,38 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false); } + @Override + protected void handleLinkRemotelyClosed() { + // Remote closed indicating there was no demand, so we can cleanup the federation binding + deleteAddressFederationBindingIfPresent(); + } + + @Override + protected void handleLinkLocallyClosed(ErrorCondition error) { + // Local side forcibly removed the federation consumer so we should ensure the binding is removed. + deleteAddressFederationBindingIfPresent(); + } + + private void deleteAddressFederationBindingIfPresent() { + if (senderContext == null) { + return; + } + + try { + final Sender sender = senderContext.getSender(); + final Source source = (Source) sender.getRemoteSource(); + final SimpleString queueName = SimpleString.of(sender.getName()); + final RoutingType routingType = getRoutingType(source); + + final QueueQueryResult queueQuery = sessionSPI.queueQuery(queueName, routingType, false); + if (queueQuery.isExists()) { + sessionSPI.deleteQueue(queueName); + } + } catch (Exception e) { + logger.debug("Federation address sender link closed cleanup caught error: ", e); + } + } + @SuppressWarnings("unchecked") private String getJMSSelectorFromFilters(Source source) throws ActiveMQAMQPException { final Map.Entry jmsSelector = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java index b857c03d58..91302ebb2e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java @@ -79,14 +79,20 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro } @Override - public void close() throws Exception { + public final void close() throws Exception { if (federation != null) { federation.removeLinkClosedInterceptor(controllerId); } + + handleLinkRemotelyClosed(); + } + + protected void handleLinkRemotelyClosed() { + // Default does nothing. } @Override - public void close(ErrorCondition error) { + public final void close(ErrorCondition error) { if (error != null && AmqpError.RESOURCE_DELETED.equals(error.getCondition())) { if (resourceDeletedAction != null) { resourceDeletedAction.accept(error); @@ -96,6 +102,12 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro if (federation != null) { federation.removeLinkClosedInterceptor(controllerId); } + + handleLinkLocallyClosed(error); + } + + protected void handleLinkLocallyClosed(ErrorCondition error) { + // Default does nothing. } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java index d34d429604..17f54b5ac2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java @@ -96,6 +96,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Divert; +import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -3882,6 +3883,308 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteConnectionSuddenDropLeaveAddressBindingIntact() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().withMessage().withHeader().also() + .withMessageAnnotations().also() + .withProperties().also() + .withValue("one").and() + .accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createTextMessage("one")); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + // Unexpected connection drop should leave durable federation address subscription in place. + Wait.assertTrue(() -> server.getConnectionCount() == 0); + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + } + + // Send a message to check that the federation binding holds onto sends while the remote is offline + // due to connectivity issues. + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createTextMessage("two")); + } + + // Reconnect again as if the remote has recovered from the unexpected connection drop + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there was still a binding from the previous federation whose connection dropped + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + peer.expectTransfer().withMessage().withHeader().also() + .withMessageAnnotations().also() + .withProperties().also() + .withValue("two").and() + .accept(); + + // Connect again to remote as if local demand still matches our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testFederationAddressBindingCleanedUpAfterConnectionDroppedIfConfiguredTo() throws Exception { + doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(true); + } + + @Test + @Timeout(20) + public void testFederationAddressBindingNotCleanedUpAfterConnectionDroppedIfConfiguredNotTo() throws Exception { + doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(false); + } + + private void doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(boolean autoDelete) throws Exception { + server.getConfiguration().setAddressQueueScanPeriod(100); + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map remoteSourceProperties = new HashMap<>(); + if (autoDelete) { + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 200L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + } else { + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + } + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 5_000, 500); + + final SimpleString binding = server.bindingQuery(SimpleString.of("test")).getQueueNames().get(0); + assertNotNull(binding); + assertTrue(binding.startsWith(SimpleString.of("federation"))); + + final QueueQueryResult federationBinding = server.queueQuery(binding); + if (autoDelete) { + assertTrue(federationBinding.isAutoDelete()); + assertEquals(200, federationBinding.getAutoDeleteDelay()); + assertEquals(-1, federationBinding.getAutoDeleteMessageCount()); + } else { + assertFalse(federationBinding.isAutoDelete()); + assertEquals(-1, federationBinding.getAutoDeleteDelay()); + assertEquals(-1, federationBinding.getAutoDeleteMessageCount()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + if (autoDelete) { + // Queue binding should eventually be auto deleted based on configuration + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0, 5_000, 100); + } else { + // Should still be there as it wasn't marked as auto delete as previously validated. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 1_000, 100); + } + + server.stop(); + } + } + private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) { final Map eventMap = new LinkedHashMap<>(); eventMap.put(REQUESTED_ADDRESS_NAME, address);