ARTEMIS-5007 Ensure mirror connection recovers from manual closures

If a user for some reason force closes the local mirror connection SNF
consumer or the actual connection but hasn't stopped the broker connection
itself the connection should recover and rebuild. The fix ensures that if
local connections are closed first then local session resources get freed.
This commit is contained in:
Timothy Bish 2024-08-26 16:36:29 -04:00 committed by Robbie Gemmell
parent 13482519be
commit 18e6f1a88d
3 changed files with 281 additions and 0 deletions

View File

@ -102,6 +102,7 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.EndpointState;
@ -933,6 +934,18 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public void close() throws Exception {
}
@Override
public void close(ErrorCondition error) {
// If remote closed already than normal broker connection link closed handling will already kick in,
// if not then the connection should be locally force closed as it would otherwise sit in a zombie state
// never consuming from the SNF Queue again.
if (sender.getRemoteState() != EndpointState.CLOSED) {
AMQPBrokerConnection.this.runtimeError(new ActiveMQAMQPInternalErrorException(
"Broker connection mirror consumer locally closed unexpectedly: " + error.getCondition().toString()));
}
}
@Override

View File

@ -760,6 +760,31 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
runLater(() -> connectionCallback.getProtonConnectionDelegate().fail(new ActiveMQAMQPInternalErrorException(errorMessage)));
}
@Override
public void onLocalClose(Connection connection) {
handler.requireHandler();
// If the connection delegate is marked as destroyed the IO connection is closed
// or closing and will never hear back from the remote with a matching Close
// performative from the peer on the other side. In this case we should allow the
// sessions a chance to clean up any local sender or receiver bindings.
if (connectionCallback.getProtonConnectionDelegate().isDestroyed()) {
for (AMQPSessionContext protonSession : sessions.values()) {
try {
protonSession.close();
} catch (Exception e) {
// We are closing so ignore errors from attempts to cleanup
logger.trace("Caught error while handling local connection close: ", e);
}
}
sessions.clear();
// Try and flush any pending work if the IO hasn't yet been closed.
handler.flushBytes();
}
}
@Override
public void onRemoteClose(Connection connection) {
handler.requireHandler();

View File

@ -20,7 +20,10 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_FORCED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.hamcrest.Matchers.nullValue;
import java.lang.invoke.MethodHandles;
import java.net.URI;
@ -43,12 +46,15 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
@ -709,4 +715,241 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport {
server.stop();
}
}
@Test
@Timeout(20)
public void testMirrorConnectionRecoversIfLocalSNFConsumerIsForcedClosed() throws Exception {
final Map<String, Object> brokerProperties = new HashMap<>();
brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
final String snfQueueName = "$ACTIVEMQ_ARTEMIS_MIRROR_" + getTestName();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
.withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.respond()
.withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.withPropertiesMap(brokerProperties);
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectTransfer().accept(); // Notification address create
peer.expectTransfer().accept(); // Address create
peer.expectTransfer().accept(); // Queue create
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
mirror.setQueueCreation(true);
mirror.setDurable(true);
mirror.setName(getTestName());
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(5);// Allow some reconnects
amqpConnection.setUser("user");
amqpConnection.setPassword("pass");
amqpConnection.addElement(mirror);
amqpConnection.setRetryInterval(100);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(QueueConfiguration.of(getTestName()).setDurable(true));
peer.waitForScriptToComplete();
peer.expectDetach().respond();
peer.expectClose().optional();
peer.expectConnectionToDrop();
peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
.withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.respond()
.withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.withPropertiesMap(brokerProperties);
peer.remoteFlow().withLinkCredit(10).queue();
final org.apache.activemq.artemis.core.server.Queue snfQueue = server.locateQueue(snfQueueName);
assertNotNull(snfQueue);
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 1, 5_000, 100);
// Close the SNF consumer on the local broker which should trigger rebuild or mirror connection
final ServerConsumer snfConsumer = (ServerConsumer) snfQueue.getConsumers().stream().findFirst().get();
assertNotNull(snfConsumer);
assertFalse(snfConsumer.isClosed());
final RemotingConnection connection = server.getRemotingService().getConnection(snfConsumer.getConnectionID());
assertNotNull(connection);
assertFalse(connection.isDestroyed());
try {
server.getActiveMQServerControl().closeConsumerWithID(snfConsumer.getSessionID(), String.valueOf(snfConsumer.getSequentialID()));
} catch (Exception e) {
fail("Should not have thrown an error closing the SNF consumer manually: " + e.getMessage());
}
Wait.assertTrue(() -> snfConsumer.isClosed(), 5_000, 100);
peer.waitForScriptToComplete();
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 1, 5_000, 100);
}
}
@Test
@Timeout(20)
public void testMirrorConnectionRecoversIfSourceConnectionIsManuallyClosed() throws Exception {
final Map<String, Object> brokerProperties = new HashMap<>();
brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
final String snfQueueName = "$ACTIVEMQ_ARTEMIS_MIRROR_" + getTestName();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
.withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.respond()
.withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.withPropertiesMap(brokerProperties);
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectTransfer().accept(); // Notification address create
peer.expectTransfer().accept(); // Address create
peer.expectTransfer().accept(); // Queue create
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
mirror.setQueueCreation(true);
mirror.setDurable(true);
mirror.setName(getTestName());
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(5);// Allow some reconnects
amqpConnection.setUser("user");
amqpConnection.setPassword("pass");
amqpConnection.addElement(mirror);
amqpConnection.setRetryInterval(100);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(QueueConfiguration.of(getTestName()).setDurable(true));
peer.waitForScriptToComplete();
peer.expectClose().optional();
peer.expectConnectionToDrop();
peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
.withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.respond()
.withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.withPropertiesMap(brokerProperties);
peer.remoteFlow().withLinkCredit(10).queue();
final org.apache.activemq.artemis.core.server.Queue snfQueue = server.locateQueue(snfQueueName);
assertNotNull(snfQueue);
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 1, 5_000, 100);
// Close consumer connection which should trigger a rebuild of the mirror broker connection since
// the broker connection itself wasn't stopped.
final ServerConsumer snfConsumer = (ServerConsumer) snfQueue.getConsumers().stream().findFirst().get();
final RemotingConnection connection = server.getRemotingService().getConnection(snfConsumer.getConnectionID());
try {
connection.close();
} catch (Exception e) {
fail("Should not have thrown an error closing the SNF connection manually: " + e.getMessage());
}
Wait.assertTrue(() -> connection.isDestroyed(), 5_000, 100);
peer.waitForScriptToComplete();
peer.expectTransfer().accept(); // Address create
peer.expectTransfer().accept(); // Queue create
server.createQueue(QueueConfiguration.of(getTestName() + ":1").setDurable(true));
peer.waitForScriptToComplete();
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 1, 5_000, 100);
}
}
@Test
@Timeout(20)
public void testMirrorConnectionCleansUpWhenBrokerConnectionStopped() throws Exception {
final Map<String, Object> brokerProperties = new HashMap<>();
brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
final String snfQueueName = "$ACTIVEMQ_ARTEMIS_MIRROR_" + getTestName();
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
.withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.respond()
.withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
.withPropertiesMap(brokerProperties);
peer.remoteFlow().withLinkCredit(10).queue();
peer.expectTransfer().accept(); // Notification address create
peer.expectTransfer().accept(); // Address create
peer.expectTransfer().accept(); // Queue create
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
AMQPMirrorBrokerConnectionElement mirror = new AMQPMirrorBrokerConnectionElement();
mirror.setQueueCreation(true);
mirror.setDurable(true);
mirror.setName(getTestName());
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(5);// Allow some reconnects
amqpConnection.setUser("user");
amqpConnection.setPassword("pass");
amqpConnection.addElement(mirror);
amqpConnection.setRetryInterval(100);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.createQueue(QueueConfiguration.of(getTestName()).setDurable(true));
peer.waitForScriptToComplete();
peer.expectDetach().optional();
peer.expectClose().withError(nullValue()).optional();
peer.expectConnectionToDrop();
final org.apache.activemq.artemis.core.server.Queue snfQueue = server.locateQueue(snfQueueName);
assertNotNull(snfQueue);
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 1, 5_000, 100);
try {
server.getActiveMQServerControl().stopBrokerConnection(getTestName());
} catch (Exception e) {
fail("Should not have thrown an error stopping the broker connection manually: " + e.getMessage());
}
peer.waitForScriptToComplete();
Wait.assertTrue(() -> snfQueue.getConsumerCount() == 0, 5_000, 100);
}
}
}