ARTEMIS-4998 Fix Federation link close wrongly closing the connection

The federation sender links can react incorrectly when the source broker
closes a receiver because the demand is gone and they can result in the
remote broker closing its side of the connection causing the whole federation
to need to be rebuilt. Handle the closure events correctly to prevent an
unexpected close and rebuild.
This commit is contained in:
Timothy Bish 2024-08-19 11:18:31 -04:00 committed by Robbie Gemmell
parent c395db526c
commit c932b75de6
6 changed files with 344 additions and 17 deletions

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
@ -79,10 +78,7 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
final Source source = (Source) sender.getRemoteSource();
final String selector;
final SimpleString queueName = SimpleString.of(sender.getName());
final Connection protonConnection = sender.getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();
AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
final Connection protonConnection = session.getSession().getConnection();
if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
@ -196,6 +192,8 @@ public final class AMQPFederationAddressSenderController extends AMQPFederationB
// to the remote to prompt it to create a new receiver.
resourceDeletedAction = (e) -> federation.registerMissingAddress(address.toString());
registerRemoteLinkClosedInterceptor(sender);
return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false);
}

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
@ -34,6 +38,8 @@ import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContex
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
/**
* A base class abstract {@link SenderController} implementation for use by federation address and
@ -43,6 +49,8 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
protected final AMQPSessionContext session;
protected final AMQPSessionCallback sessionSPI;
protected final AMQPFederation federation;
protected final String controllerId = UUID.randomUUID().toString();
protected AMQPMessageWriter standardMessageWriter;
protected AMQPLargeMessageWriter largeMessageWriter;
@ -54,6 +62,10 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
protected Consumer<ErrorCondition> resourceDeletedAction;
public AMQPFederationBaseSenderController(AMQPSessionContext session) throws ActiveMQAMQPException {
final Connection protonConnection = session.getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();
this.federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
this.session = session;
this.sessionSPI = session.getSessionSPI();
}
@ -68,7 +80,9 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
@Override
public void close() throws Exception {
// Currently there isn't anything needed on close of this controller.
if (federation != null) {
federation.removeLinkClosedInterceptor(controllerId);
}
}
@Override
@ -78,6 +92,10 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
resourceDeletedAction.accept(error);
}
}
if (federation != null) {
federation.removeLinkClosedInterceptor(controllerId);
}
}
@Override
@ -108,4 +126,18 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro
return selected;
}
protected final void registerRemoteLinkClosedInterceptor(Sender protonSender) {
Objects.requireNonNull(federation, "Subclass should have validated federation state before adding an interceptor");
federation.addLinkClosedInterceptor(controllerId, (link) -> {
// Normal close from remote due to demand being removed is handled here but remote close with an error is left
// to the parent federation instance to decide on how it should be handled.
if (link == protonSender && (link.getRemoteCondition() == null || link.getRemoteCondition().getCondition() == null)) {
return true;
}
return false;
});
}
}

View File

@ -20,7 +20,6 @@ package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE_CAPABILITY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
import java.util.Map;
@ -47,7 +46,6 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
/**
@ -69,10 +67,6 @@ public final class AMQPFederationQueueSenderController extends AMQPFederationBas
final Sender sender = senderContext.getSender();
final Source source = (Source) sender.getRemoteSource();
final String selector;
final Connection protonConnection = sender.getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();
AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
@ -147,6 +141,8 @@ public final class AMQPFederationQueueSenderController extends AMQPFederationBas
// to the remote to prompt it to create a new receiver.
resourceDeletedAction = (e) -> federation.registerMissingQueue(targetQueue.toString());
registerRemoteLinkClosedInterceptor(sender);
return (Consumer) sessionSPI.createSender(senderContext, targetQueue, selector, false);
}

View File

@ -55,7 +55,6 @@ import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
@ -279,10 +278,6 @@ public class AMQPFederationSource extends AMQPFederation {
brokerConnection.runtimeError(cause);
}
protected boolean interceptLinkClosedEvent(Link link) {
return false;
}
private void asyncCreateTargetEventsSender(AMQPFederationCommandDispatcher commandLink) {
// If no remote policies configured then we don't need an events sender link
// currently, if some other use is added for this link this code must be

View File

@ -3727,6 +3727,161 @@ public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testRemoteFederationReceiverCloseWhenDemandRemovedDoesNotTerminateRemoteConnection() throws Exception {
server.start();
final Map<String, Object> remoteSourceProperties = new HashMap<>();
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().accept();
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createTopic("test"));
producer.send(session.createMessage());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.remoteDetach().now(); // simulate demand removed so consumer is closed.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if new demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
@Test
@Timeout(20)
public void testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection() throws Exception {
server.start();
final Map<String, Object> remoteSourceProperties = new HashMap<>();
remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-address-receiver")
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
.withName("federation-address-receiver")
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties)
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test")
.withCapabilities("topic")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().accept();
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createTopic("test"));
producer.send(session.createMessage());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Under normal circumstances the federation source will never close one of its
// receivers with an error, so if that happens the remote will shutdown the connection
// and let the local rebuild.
peer.expectDetach();
peer.expectClose().withError(AmqpError.INTERNAL_ERROR.toString()).respond();
peer.remoteDetach().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "error").now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
private static void sendAddressAddedEvent(ProtonTestPeer peer, String address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);

View File

@ -3841,6 +3841,157 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testRemoteFederationReceiverCloseWhenDemandRemovedDoesNotTerminateRemoteConnection() throws Exception {
server.start();
server.createQueue(QueueConfiguration.of("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-queue-receiver")
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test::test")
.withCapabilities("queue");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withName("federation-queue-receiver")
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test::test")
.withCapabilities("queue")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().accept();
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createQueue("test"));
producer.send(session.createMessage());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.remoteDetach().now(); // simulate demand removed so consumer is closed.
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-queue-receiver")
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test::test")
.withCapabilities("queue");
// Connect to remote as if new demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withName("federation-queue-receiver")
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test::test")
.withCapabilities("queue")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectClose();
peer.remoteClose().now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
@Test
@Timeout(20)
public void testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection() throws Exception {
server.start();
server.createQueue(QueueConfiguration.of("test").setRoutingType(RoutingType.ANYCAST)
.setAddress("test")
.setAutoCreated(false));
try (ProtonTestClient peer = new ProtonTestClient()) {
scriptFederationConnectToRemote(peer, "test");
peer.connect("localhost", AMQP_PORT);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofSender().withName("federation-queue-receiver")
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withTarget().also()
.withSource().withAddress("test::test")
.withCapabilities("queue");
// Connect to remote as if some demand had matched our federation policy
peer.remoteAttach().ofReceiver()
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
.withName("federation-queue-receiver")
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().withDurabilityOfNone()
.withExpiryPolicyOnLinkDetach()
.withAddress("test::test")
.withCapabilities("queue")
.and()
.withTarget().and()
.now();
peer.remoteFlow().withLinkCredit(10).now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectTransfer().accept();
// Federate a message to check link is attached properly
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(session.createQueue("test"));
producer.send(session.createMessage());
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Under normal circumstances the federation source will never close one of its
// receivers with an error, so if that happens the remote will shutdown the connection
// and let the local rebuild.
peer.expectDetach();
peer.expectClose().withError(AmqpError.INTERNAL_ERROR.toString()).respond();
peer.remoteDetach().withErrorCondition(AmqpError.RESOURCE_DELETED.toString(), "error").now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
server.stop();
}
}
private static void sendQueueAddedEvent(ProtonTestPeer peer, String address, String queue, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);