From 892c1225b09cad90b1cbbb24654f6fcaa10512ae Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 30 May 2024 18:04:25 -0400 Subject: [PATCH] ARTEMIS-4788 Fix a rare race on broker shutdown in AMQP federation Race on consumer create and broker shutdown could lead to a deadlocak trying to access configuration from the policy manager while the federation instance is trying to shutdown the policy manager. --- .../AMQPFederationAddressPolicyManager.java | 16 ++- .../AMQPFederationConsumerConfiguration.java | 25 +++-- .../AMQPFederationQueuePolicyManager.java | 15 ++- .../FederationAddressPolicyManager.java | 11 ++ .../FederationQueuePolicyManager.java | 11 ++ .../connect/AMQPFederationConnectTest.java | 103 ++++++++++++++++++ 6 files changed, 163 insertions(+), 18 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java index ca5ce721a9..277a267838 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,16 +49,23 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected final AMQPFederation federation; - protected final AMQPFederationConsumerConfiguration configuration; - protected final String remoteQueueFilter; + protected volatile AMQPFederationConsumerConfiguration configuration; + protected volatile AMQPSessionContext session; + public AMQPFederationAddressPolicyManager(AMQPFederation federation, FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException { super(federation, addressPolicy); this.federation = federation; this.remoteQueueFilter = generateAddressFilter(policy.getMaxHops()); - this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties()); + } + + @Override + protected void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy) { + // Capture state for the current connection on each start of the policy manager. + configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties()); + session = federation.getSessionContext(); } @Override @@ -88,7 +96,7 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM // Don't initiate anything yet as the caller might need to register error handlers etc // before the attach is sent otherwise they could miss the failure case. - return new AMQPFederationAddressConsumer(federation, configuration, federation.getSessionContext(), consumerInfo, policy); + return new AMQPFederationAddressConsumer(federation, configuration, session, consumerInfo, policy); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java index cced975129..0192558c29 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java @@ -28,6 +28,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.qpid.proton.engine.Receiver; @@ -41,11 +42,13 @@ import org.apache.qpid.proton.engine.Receiver; public final class AMQPFederationConsumerConfiguration { private final Map properties; - private final AMQPFederation federation; + private final AMQPFederationConfiguration configuration; @SuppressWarnings("unchecked") - public AMQPFederationConsumerConfiguration(AMQPFederation federation, Map properties) { - this.federation = federation; + public AMQPFederationConsumerConfiguration(AMQPFederationConfiguration configuration, Map properties) { + Objects.requireNonNull(configuration, "Federation configuration cannot be null"); + + this.configuration = configuration; if (properties == null || properties.isEmpty()) { this.properties = Collections.EMPTY_MAP; @@ -61,7 +64,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getConfiguration().getReceiverCredits(); + return configuration.getReceiverCredits(); } } @@ -72,7 +75,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getConfiguration().getReceiverCreditsLow(); + return configuration.getReceiverCreditsLow(); } } @@ -86,7 +89,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getConfiguration().getPullReceiverBatchSize(); + return configuration.getPullReceiverBatchSize(); } } @@ -97,7 +100,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getConfiguration().getLargeMessageThreshold(); + return configuration.getLargeMessageThreshold(); } } @@ -108,7 +111,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Integer.parseInt((String) property); } else { - return federation.getConfiguration().getLinkAttachTimeout(); + return configuration.getLinkAttachTimeout(); } } @@ -119,7 +122,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.getConfiguration().isCoreMessageTunnelingEnabled(); + return configuration.isCoreMessageTunnelingEnabled(); } } @@ -130,7 +133,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.getConfiguration().isIgnoreSubscriptionFilters(); + return configuration.isIgnoreSubscriptionFilters(); } } @@ -141,7 +144,7 @@ public final class AMQPFederationConsumerConfiguration { } else if (property instanceof String) { return Boolean.parseBoolean((String) property); } else { - return federation.getConfiguration().isIgnoreSubscriptionPriorities(); + return configuration.isIgnoreSubscriptionPriorities(); } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java index 343a305b7a..d795141106 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.utils.CompositeAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +47,21 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected final AMQPFederation federation; - protected final AMQPFederationConsumerConfiguration configuration; + + protected volatile AMQPFederationConsumerConfiguration configuration; + protected volatile AMQPSessionContext session; public AMQPFederationQueuePolicyManager(AMQPFederation federation, FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException { super(federation, queuePolicy); this.federation = federation; - this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties()); + } + + @Override + protected void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy) { + // Capture state for the current connection on each start of the policy manager. + configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties()); + session = federation.getSessionContext(); } @Override @@ -87,7 +96,7 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag // Don't initiate anything yet as the caller might need to register error handlers etc // before the attach is sent otherwise they could miss the failure case. - return new AMQPFederationQueueConsumer(federation, configuration, federation.getSessionContext(), consumerInfo, policy); + return new AMQPFederationQueueConsumer(federation, configuration, session, consumerInfo, policy); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java index 8c3685c01b..59caead680 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.java @@ -87,6 +87,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi public synchronized void start() { if (!started) { started = true; + handlePolicyManagerStarted(policy); server.registerBrokerPlugin(this); scanAllBindings(); // Create remote consumers for existing addresses with demand. } @@ -477,6 +478,16 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi return policy.test(address, type); } + /** + * Called on start of the manager before any other actions are taken to allow the subclass time + * to configure itself and prepare any needed state prior to starting management of federated + * resources. + * + * @param policy + * The policy configuration for this policy manager. + */ + protected abstract void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy); + /** * Create a new {@link FederationConsumerInfo} based on the given {@link AddressInfo} * and the configured {@link FederationReceiveFromAddressPolicy}. A subclass must override this diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java index 23330c4479..4b148f60f5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationQueuePolicyManager.java @@ -80,6 +80,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons public synchronized void start() { if (!started) { started = true; + handlePolicyManagerStarted(policy); server.registerBrokerPlugin(this); scanAllQueueBindings(); // Create consumers for existing queue with demand. } @@ -303,6 +304,16 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons return policy.testQueue(queueName); } + /** + * Called on start of the manager before any other actions are taken to allow the subclass time + * to configure itself and prepare any needed state prior to starting management of federated + * resources. + * + * @param policy + * The policy configuration for this policy manager. + */ + protected abstract void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy); + /** * Create a new {@link FederationConsumerInfo} based on the given {@link ServerConsumer} * and the configured {@link FederationReceiveFromQueuePolicy}. A subclass must override this diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java index 329093995a..9b0a2715ae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java @@ -57,15 +57,25 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin; +import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.protonj2.test.driver.ProtonTestClient; @@ -1017,6 +1027,99 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { } } + @Test(timeout = 30_000) + public void testFederationDemandAddedAndImmediateBrokerShutdownOverlaps() throws Exception { + // Testing for a race on broker shutdown if demand was added at the same time and the + // broker is creating an outbound consumer to match that demand. + for (int i = 0; i < 2; ++i) { + doTestFederationDemandAddedAndImmediateBrokerShutdown(); + } + } + + private void doTestFederationDemandAddedAndImmediateBrokerShutdown() throws Exception { + if (server == null || !server.isStarted()) { + server = createServer(); + } + + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withDesiredCapability(FEDERATION_CONTROL_LINK.toString()) + .respond() + .withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); + peer.expectAttach().ofReceiver() + .withSenderSettleModeSettled() + .withDesiredCapability(FEDERATION_EVENT_LINK.toString()) + .respondInKind(); + peer.expectFlow().withLinkCredit(10); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes("test", "test"); + + final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); + element.setName(getTestName()); + element.addLocalQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.registerBrokerPlugin(new ActiveMQServerAMQPFederationPlugin() { + + @Override + public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException { + logger.debug("Delaying attach of outgoing federation receiver"); + ForkJoinPool.commonPool().execute(() -> { + try { + server.stop(); + } catch (Exception e) { + } + }); + // Allow a bit of time for the server stop to get started before allowing + // the remote federation consumer to begin being built. + try { + Thread.sleep(new Random(System.currentTimeMillis()).nextInt(8)); + } catch (InterruptedException e) { + } + } + }); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().optional(); + peer.expectFlow().optional(); + peer.expectDetach().optional(); + peer.expectClose().optional(); + peer.expectConnectionToDrop(); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + + connection.start(); + + try { + session.createConsumer(session.createQueue("test")); + } catch (JMSException ex) { + // Ignored as we are asynchronously shutting down the server, this could happen. + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + } + // Use these methods to script the initial handshake that a broker that is establishing // a federation connection with a remote broker instance would perform.