ARTEMIS-2969 / ARTEMIS-2937 Making sure consumers are cleaned after failures and reconnects

This commit is contained in:
Clebert Suconic 2020-11-02 17:13:46 -05:00
parent 7a435a944c
commit 648340a864
7 changed files with 171 additions and 38 deletions

View File

@ -138,9 +138,12 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public void stop() {
if (!started) return;
started = false;
if (connection != null) {
connection.close();
if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
protonRemotingConnection = null;
connection = null;
}
ScheduledFuture scheduledFuture = reconnectFuture;
reconnectFuture = null;
@ -151,6 +154,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public void start() throws Exception {
if (started) return;
started = true;
server.getConfiguration().registerBrokerPlugin(this);
try {
@ -158,7 +162,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
installMirrorController(this, (AMQPMirrorBrokerConnectionElement) connectionElement, server);
}
}
}
@ -321,7 +325,28 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
* It is returning the snfQueue to the replica, and I needed isolation from the actual instance.
* During development I had a mistake where I used a property from the Object,
* so, I needed this isolation for my organization and making sure nothing would be shared. */
private static QueueBinding installMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) throws Exception {
private static Queue installMirrorController(AMQPBrokerConnection brokerConnection, AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) throws Exception {
MirrorController currentMirrorController = server.getMirrorController();
// This following block is to avoid a duplicate on mirror controller
if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerSource) {
Queue queue = checkCurrentMirror(brokerConnection, (AMQPMirrorControllerSource)currentMirrorController);
// on this case we already had a mirror installed before, we won't duplicate it
if (queue != null) {
return queue;
}
} else if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerAggregation) {
AMQPMirrorControllerAggregation aggregation = (AMQPMirrorControllerAggregation) currentMirrorController;
for (AMQPMirrorControllerSource source : aggregation.getPartitions()) {
Queue queue = checkCurrentMirror(brokerConnection, source);
// on this case we already had a mirror installed before, we won't duplicate it
if (queue != null) {
return queue;
}
}
}
AddressInfo addressInfo = server.getAddressInfo(replicaConfig.getSourceMirrorAddress());
if (addressInfo == null) {
@ -354,11 +379,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
throw new IllegalAccessException("Cannot start replica");
}
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval());
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(snfQueue, server, replicaConfig.isMessageAcknowledgements(), replicaConfig.isQueueCreation(), replicaConfig.isQueueRemoval(), brokerConnection);
server.scanAddresses(newPartition);
MirrorController currentMirrorController = server.getMirrorController();
if (currentMirrorController == null) {
server.installMirrorController(newPartition);
@ -374,7 +398,17 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
((AMQPMirrorControllerAggregation) currentMirrorController).addPartition(newPartition);
}
return snfReplicaQueueBinding;
return snfQueue;
}
private static Queue checkCurrentMirror(AMQPBrokerConnection brokerConnection,
AMQPMirrorControllerSource currentMirrorController) {
AMQPMirrorControllerSource source = currentMirrorController;
if (source.getBrokerConnection() == brokerConnection) {
return source.getSnfQueue();
}
return null;
}
private void connectReceiver(ActiveMQProtonRemotingConnection protonRemotingConnection,
@ -534,7 +568,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
try {
if (protonRemotingConnection != null) {
protonRemotingConnection.destroy();
protonRemotingConnection.fail(new ActiveMQException("Connection being recreated"));
connection = null;
protonRemotingConnection = null;
}

View File

@ -59,6 +59,10 @@ public class AMQPMirrorControllerAggregation implements MirrorController, Active
return false;
}
public List<AMQPMirrorControllerSource> getPartitions() {
return partitions;
}
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
for (MirrorController partition : partitions) {

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
@ -69,6 +70,7 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
final boolean acks;
final boolean addQueues;
final boolean deleteQueues;
private final AMQPBrokerConnection brokerConnection;
boolean started;
@ -85,12 +87,21 @@ public class AMQPMirrorControllerSource implements MirrorController, ActiveMQCom
return started;
}
public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues) {
public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks, boolean addQueues, boolean deleteQueues, AMQPBrokerConnection brokerConnection) {
this.snfQueue = snfQueue;
this.server = server;
this.acks = acks;
this.addQueues = addQueues;
this.deleteQueues = deleteQueues;
this.brokerConnection = brokerConnection;
}
public Queue getSnfQueue() {
return snfQueue;
}
public AMQPBrokerConnection getBrokerConnection() {
return brokerConnection;
}
@Override

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.function.ToLongFunction;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -255,6 +256,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
try {
server.removeAddressInfo(addressInfo.getName(), null, true);
} catch (ActiveMQAddressDoesNotExistException expected) {
// it was removed from somewhere else, which is fine
logger.debug(expected.getMessage(), expected);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}

View File

@ -67,15 +67,25 @@ public class AMQPBridgeTest extends AmqpClientTestSupport {
@Test
public void testSimpleTransferPush() throws Exception {
internalTransferPush("TEST", false);
internalTransferPush("TEST", false, false);
}
@Test
public void testSimpleTransferPushRestartBC() throws Exception {
internalTransferPush("TEST", false, true);
}
@Test
public void testSimpleTransferPushDeferredCreation() throws Exception {
internalTransferPush("TEST", true);
internalTransferPush("TEST", true, false);
}
public void internalTransferPush(String queueName, boolean deferCreation) throws Exception {
@Test
public void testSimpleTransferPushDeferredCreationRestartBC() throws Exception {
internalTransferPush("TEST", true, true);
}
public void internalTransferPush(String queueName, boolean deferCreation, boolean restartBC) throws Exception {
server.setIdentity("targetServer");
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST));
@ -100,6 +110,12 @@ public class AMQPBridgeTest extends AmqpClientTestSupport {
server_2.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
}
if (restartBC) {
server_2.stopBrokerConnection("test");
Thread.sleep(1000);
server_2.startBrokerConnection("test");
}
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -254,42 +255,47 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
@Test
public void testReplicaLargeMessages() throws Exception {
replicaTest(true, true, false, false, false, false);
replicaTest(true, true, false, false, false, false, false);
}
@Test
public void testReplicaLargeMessagesPagingEverywhere() throws Exception {
replicaTest(true, true, true, true, false, false);
replicaTest(true, true, true, true, false, false, false);
}
@Test
public void testReplica() throws Exception {
replicaTest(false, true, false, false, false, false);
replicaTest(false, true, false, false, false, false, false);
}
@Test
public void testReplicaRestartBrokerConnection() throws Exception {
replicaTest(false, true, false, false, false, false, true);
}
@Test
public void testReplicaRestart() throws Exception {
replicaTest(false, true, false, false, false, true);
replicaTest(false, true, false, false, false, true, false);
}
@Test
public void testReplicaDeferredStart() throws Exception {
replicaTest(false, true, false, false, true, false);
replicaTest(false, true, false, false, true, false, false);
}
@Test
public void testReplicaCopyOnly() throws Exception {
replicaTest(false, false, false, false, false, false);
replicaTest(false, false, false, false, false, false, false);
}
@Test
public void testReplicaPagedTarget() throws Exception {
replicaTest(false, true, true, false, false, false);
replicaTest(false, true, true, false, false, false, false);
}
@Test
public void testReplicaPagingEverywhere() throws Exception {
replicaTest(false, true, true, true, false, false);
replicaTest(false, true, true, true, false, false, false);
}
private String getText(boolean large, int i) {
@ -447,7 +453,10 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
boolean pagingTarget,
boolean pagingSource,
boolean deferredStart,
boolean restartAndDisconnect) throws Exception {
boolean restartAndDisconnect,
boolean restartBrokerConnection) throws Exception {
String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
server.setIdentity("targetServer");
if (deferredStart) {
server.stop();
@ -458,7 +467,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks);
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
@ -525,6 +534,13 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
if (restartBrokerConnection) {
// stop and start the broker connection, making sure we wouldn't duplicate the mirror
server_2.stopBrokerConnection(brokerConnectionName);
Thread.sleep(1000);
server_2.startBrokerConnection(brokerConnectionName);
}
if (pagingTarget) {
assertTrue(queueOnServer1.getPagingStore().isPaging());
}
@ -572,24 +588,29 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
}
@Test
public void testDualStandardRestartBrokerConnection() throws Exception {
dualReplica(false, false, false, true);
}
@Test
public void testDualStandard() throws Exception {
dualReplica(false, false, false);
dualReplica(false, false, false, false);
}
@Test
public void testDualRegularPagedTargets() throws Exception {
dualReplica(false, false, true);
dualReplica(false, false, true, false);
}
@Test
public void testDualRegularPagedEverything() throws Exception {
dualReplica(false, true, true);
dualReplica(false, true, true, false);
}
@Test
public void testDualRegularLarge() throws Exception {
dualReplica(true, false, false);
dualReplica(true, false, false, false);
}
public Queue locateQueue(ActiveMQServer server, String queueName) throws Exception {
@ -597,7 +618,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
return server.locateQueue(queueName);
}
private void dualReplica(boolean largeMessage, boolean pagingSource, boolean pagingTarget) throws Exception {
private void dualReplica(boolean largeMessage, boolean pagingSource, boolean pagingTarget, boolean restartBC) throws Exception {
server.setIdentity("server_1");
server.start();
@ -611,12 +632,15 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2 = createServer(AMQP_PORT_2, false);
AMQPBrokerConnectConfiguration amqpConnection1 = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
String brokerConnectionOne = "brokerConnection1:" + UUIDGenerator.getInstance().generateStringUUID();
String brokerConnectionTwo = "brokerConnection2:" + UUIDGenerator.getInstance().generateStringUUID();
AMQPBrokerConnectConfiguration amqpConnection1 = new AMQPBrokerConnectConfiguration(brokerConnectionOne, "tcp://localhost:" + AMQP_PORT);
AMQPMirrorBrokerConnectionElement replica1 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
amqpConnection1.addElement(replica1);
server_2.getConfiguration().addAMQPConnection(amqpConnection1);
AMQPBrokerConnectConfiguration amqpConnection3 = new AMQPBrokerConnectConfiguration("test2", "tcp://localhost:" + AMQP_PORT_3);
AMQPBrokerConnectConfiguration amqpConnection3 = new AMQPBrokerConnectConfiguration(brokerConnectionTwo, "tcp://localhost:" + AMQP_PORT_3);
AMQPMirrorBrokerConnectionElement replica2 = new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR);
amqpConnection3.addElement(replica2);
server_2.getConfiguration().addAMQPConnection(amqpConnection3);
@ -649,6 +673,20 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Message message = session.createTextMessage(getText(largeMessage, i));
message.setIntProperty("i", i);
producer.send(message);
if (i == NUMBER_OF_MESSAGES / 2) {
if (restartBC) {
// half we restart it
Wait.assertEquals(NUMBER_OF_MESSAGES / 2 + 1, queue_server_2::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES / 2 + 1, queue_server_3::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES / 2 + 1, queue_server_1::getMessageCount);
server_2.stopBrokerConnection(brokerConnectionOne);
server_2.stopBrokerConnection(brokerConnectionTwo);
Thread.sleep(1000);
server_2.startBrokerConnection(brokerConnectionOne);
server_2.startBrokerConnection(brokerConnectionTwo);
}
}
}
Wait.assertEquals(NUMBER_OF_MESSAGES, queue_server_2::getMessageCount);

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ExecuteUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
@ -97,38 +98,52 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
internalMultipleQueues(true, true, true, false);
internalMultipleQueues(true, true, true, false, false);
}
@Test
/** On this test the max reconnect attemps is reached. after a reconnect I will force a stop on the broker connection and retry it.
* The reconnection should succeed. */
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueueKillMaxAttempts() throws Exception {
internalMultipleQueues(true, true, true, false, true);
}
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueuePauseMaxAttempts() throws Exception {
internalMultipleQueues(true, true, false, true, false);
}
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueuePause() throws Exception {
internalMultipleQueues(true, true, false, true);
internalMultipleQueues(true, true, false, true, false);
}
@Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueue() throws Exception {
internalMultipleQueues(true, true, false, false);
internalMultipleQueues(true, true, false, false, false);
}
@Test(timeout = 60_000)
public void testWithMatching() throws Exception {
internalMultipleQueues(true, false, false, false);
internalMultipleQueues(true, false, false, false, false);
}
@Test(timeout = 60_000)
public void testwithQueueName() throws Exception {
internalMultipleQueues(false, false, false, false);
internalMultipleQueues(false, false, false, false, false);
}
@Test(timeout = 60_000)
public void testwithQueueNameDistinctName() throws Exception {
internalMultipleQueues(false, true, false, false);
internalMultipleQueues(false, true, false, false, false);
}
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill, boolean pause) throws Exception {
private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill, boolean pause, boolean maxReconnectAttemps) throws Exception {
final int numberOfMessages = 100;
final int numberOfQueues = 10;
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
String brokerConnectionName = "brokerConnection." + UUIDGenerator.getInstance().generateStringUUID();
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(maxReconnectAttemps ? 10 : -1);
if (useMatching) {
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
} else {
@ -166,12 +181,23 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
if (kill) {
qpidProcess.kill();
if (maxReconnectAttemps) {
Thread.sleep(1000); // wait some time so the connection is sure to have stopped retrying
}
startQpidRouter();
} else if (pause) {
pauseThenKill(3_000);
startQpidRouter();
}
if (maxReconnectAttemps) {
ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
Connection connection = createConnectionDumbRetry(factoryConsumer);
connection.close();
server.stopBrokerConnection(brokerConnectionName);
server.startBrokerConnection(brokerConnectionName);
}
for (int dest = 0; dest < numberOfQueues; dest++) {
ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");