ARTEMIS-4788 Fix a rare race on broker shutdown in AMQP federation

Race on consumer create and broker shutdown could lead to a deadlocak trying
to access configuration from the policy manager while the federation instance
is trying to shutdown the policy manager.
This commit is contained in:
Timothy Bish 2024-05-30 18:04:25 -04:00 committed by clebertsuconic
parent f8dce75ac8
commit 892c1225b0
6 changed files with 163 additions and 18 deletions

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,16 +49,23 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final AMQPFederation federation; protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
protected final String remoteQueueFilter; protected final String remoteQueueFilter;
protected volatile AMQPFederationConsumerConfiguration configuration;
protected volatile AMQPSessionContext session;
public AMQPFederationAddressPolicyManager(AMQPFederation federation, FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException { public AMQPFederationAddressPolicyManager(AMQPFederation federation, FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
super(federation, addressPolicy); super(federation, addressPolicy);
this.federation = federation; this.federation = federation;
this.remoteQueueFilter = generateAddressFilter(policy.getMaxHops()); this.remoteQueueFilter = generateAddressFilter(policy.getMaxHops());
this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties()); }
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy) {
// Capture state for the current connection on each start of the policy manager.
configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties());
session = federation.getSessionContext();
} }
@Override @Override
@ -88,7 +96,7 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
// Don't initiate anything yet as the caller might need to register error handlers etc // Don't initiate anything yet as the caller might need to register error handlers etc
// before the attach is sent otherwise they could miss the failure case. // before the attach is sent otherwise they could miss the failure case.
return new AMQPFederationAddressConsumer(federation, configuration, federation.getSessionContext(), consumerInfo, policy); return new AMQPFederationAddressConsumer(federation, configuration, session, consumerInfo, policy);
} }
@Override @Override

View File

@ -28,6 +28,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
@ -41,11 +42,13 @@ import org.apache.qpid.proton.engine.Receiver;
public final class AMQPFederationConsumerConfiguration { public final class AMQPFederationConsumerConfiguration {
private final Map<String, Object> properties; private final Map<String, Object> properties;
private final AMQPFederation federation; private final AMQPFederationConfiguration configuration;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public AMQPFederationConsumerConfiguration(AMQPFederation federation, Map<String, ?> properties) { public AMQPFederationConsumerConfiguration(AMQPFederationConfiguration configuration, Map<String, ?> properties) {
this.federation = federation; Objects.requireNonNull(configuration, "Federation configuration cannot be null");
this.configuration = configuration;
if (properties == null || properties.isEmpty()) { if (properties == null || properties.isEmpty()) {
this.properties = Collections.EMPTY_MAP; this.properties = Collections.EMPTY_MAP;
@ -61,7 +64,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Integer.parseInt((String) property); return Integer.parseInt((String) property);
} else { } else {
return federation.getConfiguration().getReceiverCredits(); return configuration.getReceiverCredits();
} }
} }
@ -72,7 +75,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Integer.parseInt((String) property); return Integer.parseInt((String) property);
} else { } else {
return federation.getConfiguration().getReceiverCreditsLow(); return configuration.getReceiverCreditsLow();
} }
} }
@ -86,7 +89,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Integer.parseInt((String) property); return Integer.parseInt((String) property);
} else { } else {
return federation.getConfiguration().getPullReceiverBatchSize(); return configuration.getPullReceiverBatchSize();
} }
} }
@ -97,7 +100,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Integer.parseInt((String) property); return Integer.parseInt((String) property);
} else { } else {
return federation.getConfiguration().getLargeMessageThreshold(); return configuration.getLargeMessageThreshold();
} }
} }
@ -108,7 +111,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Integer.parseInt((String) property); return Integer.parseInt((String) property);
} else { } else {
return federation.getConfiguration().getLinkAttachTimeout(); return configuration.getLinkAttachTimeout();
} }
} }
@ -119,7 +122,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Boolean.parseBoolean((String) property); return Boolean.parseBoolean((String) property);
} else { } else {
return federation.getConfiguration().isCoreMessageTunnelingEnabled(); return configuration.isCoreMessageTunnelingEnabled();
} }
} }
@ -130,7 +133,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Boolean.parseBoolean((String) property); return Boolean.parseBoolean((String) property);
} else { } else {
return federation.getConfiguration().isIgnoreSubscriptionFilters(); return configuration.isIgnoreSubscriptionFilters();
} }
} }
@ -141,7 +144,7 @@ public final class AMQPFederationConsumerConfiguration {
} else if (property instanceof String) { } else if (property instanceof String) {
return Boolean.parseBoolean((String) property); return Boolean.parseBoolean((String) property);
} else { } else {
return federation.getConfiguration().isIgnoreSubscriptionPriorities(); return configuration.isIgnoreSubscriptionPriorities();
} }
} }
} }

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager; import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,13 +47,21 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final AMQPFederation federation; protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
protected volatile AMQPFederationConsumerConfiguration configuration;
protected volatile AMQPSessionContext session;
public AMQPFederationQueuePolicyManager(AMQPFederation federation, FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException { public AMQPFederationQueuePolicyManager(AMQPFederation federation, FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException {
super(federation, queuePolicy); super(federation, queuePolicy);
this.federation = federation; this.federation = federation;
this.configuration = new AMQPFederationConsumerConfiguration(federation, policy.getProperties()); }
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy) {
// Capture state for the current connection on each start of the policy manager.
configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties());
session = federation.getSessionContext();
} }
@Override @Override
@ -87,7 +96,7 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag
// Don't initiate anything yet as the caller might need to register error handlers etc // Don't initiate anything yet as the caller might need to register error handlers etc
// before the attach is sent otherwise they could miss the failure case. // before the attach is sent otherwise they could miss the failure case.
return new AMQPFederationQueueConsumer(federation, configuration, federation.getSessionContext(), consumerInfo, policy); return new AMQPFederationQueueConsumer(federation, configuration, session, consumerInfo, policy);
} }
@Override @Override

View File

@ -87,6 +87,7 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
public synchronized void start() { public synchronized void start() {
if (!started) { if (!started) {
started = true; started = true;
handlePolicyManagerStarted(policy);
server.registerBrokerPlugin(this); server.registerBrokerPlugin(this);
scanAllBindings(); // Create remote consumers for existing addresses with demand. scanAllBindings(); // Create remote consumers for existing addresses with demand.
} }
@ -477,6 +478,16 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
return policy.test(address, type); return policy.test(address, type);
} }
/**
* Called on start of the manager before any other actions are taken to allow the subclass time
* to configure itself and prepare any needed state prior to starting management of federated
* resources.
*
* @param policy
* The policy configuration for this policy manager.
*/
protected abstract void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy);
/** /**
* Create a new {@link FederationConsumerInfo} based on the given {@link AddressInfo} * Create a new {@link FederationConsumerInfo} based on the given {@link AddressInfo}
* and the configured {@link FederationReceiveFromAddressPolicy}. A subclass must override this * and the configured {@link FederationReceiveFromAddressPolicy}. A subclass must override this

View File

@ -80,6 +80,7 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
public synchronized void start() { public synchronized void start() {
if (!started) { if (!started) {
started = true; started = true;
handlePolicyManagerStarted(policy);
server.registerBrokerPlugin(this); server.registerBrokerPlugin(this);
scanAllQueueBindings(); // Create consumers for existing queue with demand. scanAllQueueBindings(); // Create consumers for existing queue with demand.
} }
@ -303,6 +304,16 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
return policy.testQueue(queueName); return policy.testQueue(queueName);
} }
/**
* Called on start of the manager before any other actions are taken to allow the subclass time
* to configure itself and prepare any needed state prior to starting management of federated
* resources.
*
* @param policy
* The policy configuration for this policy manager.
*/
protected abstract void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy);
/** /**
* Create a new {@link FederationConsumerInfo} based on the given {@link ServerConsumer} * Create a new {@link FederationConsumerInfo} based on the given {@link ServerConsumer}
* and the configured {@link FederationReceiveFromQueuePolicy}. A subclass must override this * and the configured {@link FederationReceiveFromQueuePolicy}. A subclass must override this

View File

@ -57,15 +57,25 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient; import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
@ -1017,6 +1027,99 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
} }
} }
@Test(timeout = 30_000)
public void testFederationDemandAddedAndImmediateBrokerShutdownOverlaps() throws Exception {
// Testing for a race on broker shutdown if demand was added at the same time and the
// broker is creating an outbound consumer to match that demand.
for (int i = 0; i < 2; ++i) {
doTestFederationDemandAddedAndImmediateBrokerShutdown();
}
}
private void doTestFederationDemandAddedAndImmediateBrokerShutdown() throws Exception {
if (server == null || !server.isStarted()) {
server = createServer();
}
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("test", "test");
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.registerBrokerPlugin(new ActiveMQServerAMQPFederationPlugin() {
@Override
public void beforeCreateFederationConsumer(final FederationConsumerInfo consumerInfo) throws ActiveMQException {
logger.debug("Delaying attach of outgoing federation receiver");
ForkJoinPool.commonPool().execute(() -> {
try {
server.stop();
} catch (Exception e) {
}
});
// Allow a bit of time for the server stop to get started before allowing
// the remote federation consumer to begin being built.
try {
Thread.sleep(new Random(System.currentTimeMillis()).nextInt(8));
} catch (InterruptedException e) {
}
}
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().optional();
peer.expectFlow().optional();
peer.expectDetach().optional();
peer.expectClose().optional();
peer.expectConnectionToDrop();
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
connection.start();
try {
session.createConsumer(session.createQueue("test"));
} catch (JMSException ex) {
// Ignored as we are asynchronously shutting down the server, this could happen.
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}
// Use these methods to script the initial handshake that a broker that is establishing // Use these methods to script the initial handshake that a broker that is establishing
// a federation connection with a remote broker instance would perform. // a federation connection with a remote broker instance would perform.