ARTEMIS-5004 Clean up federation address consumer bindings proactively

When an address consumer explicitly closes or is closed we should remove the
address binding for that consumer right away instead of waiting for possible
configured auto delete as the demand is gone and we don't need to binding to
stick around any longer.
This commit is contained in:
Timothy Bish 2024-08-21 11:45:32 -04:00 committed by Robbie Gemmell
parent 84f4d73f2f
commit b58191bd52
3 changed files with 360 additions and 2 deletions

View File

@ -26,6 +26,7 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -54,9 +55,12 @@ import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link SenderController} used when an AMQP federation Address receiver is created
@ -67,6 +71,10 @@ import org.apache.qpid.proton.engine.Sender;
*/
public final class AMQPFederationAddressSenderController extends AMQPFederationBaseSenderController {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ProtonServerSenderContext senderContext;
public AMQPFederationAddressSenderController(AMQPSessionContext session) throws ActiveMQAMQPException {
super(session);
}
@ -88,6 +96,9 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links.");
}
// Store for use during link close
this.senderContext = senderContext;
// Match the settlement mode of the remote instead of relying on the default of MIXED.
sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
// We don't currently support SECOND so enforce that the answer is always FIRST
@ -197,6 +208,38 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false);
}
@Override
protected void handleLinkRemotelyClosed() {
// Remote closed indicating there was no demand, so we can cleanup the federation binding
deleteAddressFederationBindingIfPresent();
}
@Override
protected void handleLinkLocallyClosed(ErrorCondition error) {
// Local side forcibly removed the federation consumer so we should ensure the binding is removed.
deleteAddressFederationBindingIfPresent();
}
private void deleteAddressFederationBindingIfPresent() {
if (senderContext == null) {
return;
}
try {
final Sender sender = senderContext.getSender();
final Source source = (Source) sender.getRemoteSource();
final SimpleString queueName = SimpleString.of(sender.getName());
final RoutingType routingType = getRoutingType(source);
final QueueQueryResult queueQuery = sessionSPI.queueQuery(queueName, routingType, false);
if (queueQuery.isExists()) {
sessionSPI.deleteQueue(queueName);
}
} catch (Exception e) {
logger.debug("Federation address sender link closed cleanup caught error: ", e);
}
}
@SuppressWarnings("unchecked")
private String getJMSSelectorFromFilters(Source source) throws ActiveMQAMQPException {
final Map.Entry<Symbol, DescribedType> jmsSelector = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);

View File

@ -79,14 +79,20 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
}
@Override
public void close() throws Exception {
public final void close() throws Exception {
if (federation != null) {
federation.removeLinkClosedInterceptor(controllerId);
}
handleLinkRemotelyClosed();
}
protected void handleLinkRemotelyClosed() {
// Default does nothing.
}
@Override
public void close(ErrorCondition error) {
public final void close(ErrorCondition error) {
if (error != null && AmqpError.RESOURCE_DELETED.equals(error.getCondition())) {
if (resourceDeletedAction != null) {
resourceDeletedAction.accept(error);
@ -96,6 +102,12 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
if (federation != null) {
federation.removeLinkClosedInterceptor(controllerId);
}
handleLinkLocallyClosed(error);
}
protected void handleLinkLocallyClosed(ErrorCondition error) {
// Default does nothing.
}
@Override

View File

@ -96,6 +96,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -3882,6 +3883,308 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws Exception {
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST));
final Map<String, Object> remoteSourceProperties = new HashMap<>();
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
// Precondition is that there were no bindings before the federation receiver attaches.
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().accept();
// Federation consumer should be bound to the server's address
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createTopic("test"));
producer.send(session.createMessage());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.remoteDetach().now(); // simulate demand removed so consumer is closed.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Federation consumer should no longer be bound to the server's address
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
@Test
@Timeout(20)
public void testRemoteConnectionSuddenDropLeaveAddressBindingIntact() throws Exception {
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST));
final Map<String, Object> remoteSourceProperties = new HashMap<>();
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
// Precondition is that there were no bindings before the federation receiver attaches.
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().withMessage().withHeader().also()
.withMessageAnnotations().also()
.withProperties().also()
.withValue("one").and()
.accept();
// Federation consumer should be bound to the server's address
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createTopic("test"));
producer.send(session.createTextMessage("one"));
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
// Unexpected connection drop should leave durable federation address subscription in place.
Wait.assertTrue(() -> server.getConnectionCount() == 0);
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
}
// Send a message to check that the federation binding holds onto sends while the remote is offline
// due to connectivity issues.
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createTopic("test"));
producer.send(session.createTextMessage("two"));
}
// Reconnect again as if the remote has recovered from the unexpected connection drop
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
// Precondition is that there was still a binding from the previous federation whose connection dropped
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
peer.expectTransfer().withMessage().withHeader().also()
.withMessageAnnotations().also()
.withProperties().also()
.withValue("two").and()
.accept();
// Connect again to remote as if local demand still matches our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.remoteDetach().now(); // simulate demand removed so consumer is closed.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Federation consumer should no longer be bound to the server's address
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
@Test
@Timeout(20)
public void testFederationAddressBindingCleanedUpAfterConnectionDroppedIfConfiguredTo() throws Exception {
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(true);
}
@Test
@Timeout(20)
public void testFederationAddressBindingNotCleanedUpAfterConnectionDroppedIfConfiguredNotTo() throws Exception {
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(false);
}
private void doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(boolean autoDelete) throws Exception {
server.getConfiguration().setAddressQueueScanPeriod(100);
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST));
final Map<String, Object> remoteSourceProperties = new HashMap<>();
if (autoDelete) {
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 200L);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
} else {
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
}
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
// Precondition is that there were no bindings before the federation receiver attaches.
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Federation consumer should be bound to the server's address
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 5_000, 500);
final SimpleString binding = server.bindingQuery(SimpleString.of("test")).getQueueNames().get(0);
assertNotNull(binding);
assertTrue(binding.startsWith(SimpleString.of("federation")));
final QueueQueryResult federationBinding = server.queueQuery(binding);
if (autoDelete) {
assertTrue(federationBinding.isAutoDelete());
assertEquals(200, federationBinding.getAutoDeleteDelay());
assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
} else {
assertFalse(federationBinding.isAutoDelete());
assertEquals(-1, federationBinding.getAutoDeleteDelay());
assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
if (autoDelete) {
// Queue binding should eventually be auto deleted based on configuration
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0, 5_000, 100);
} else {
// Should still be there as it wasn't marked as auto delete as previously validated.
Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 1_000, 100);
}
server.stop();
}
}
private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);