ARTEMIS-4045 Fixing in Handler ACKRunner on Mirror

This commit is contained in:
Clebert Suconic 2022-10-26 13:00:57 -04:00 committed by clebertsuconic
parent a44b415395
commit 87ec9b5465
2 changed files with 57 additions and 1 deletions

View File

@ -120,6 +120,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void run() {
if (!connection.isHandler()) {
logger.info("Moving execution to proton handler");
connectionRun();
return;
}
logger.trace("Delivery settling for {}, context={}", delivery, delivery.getContext());
delivery.disposition(Accepted.getInstance());
settle(delivery);
@ -383,7 +388,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
return;
} else {
ackMessageOperation.run();
connection.runNow(ackMessageOperation);
}
}
}

View File

@ -39,8 +39,11 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorMessageFactory;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@ -117,6 +120,54 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
@Test
public void testNotFoundRetries() throws Exception {
server.setIdentity("Server1");
server.start();
server_2 = createServer(AMQP_PORT_2, false);
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.start();
server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
Wait.waitFor(() -> server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test") != null);
Queue mirrorQueue = server_2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_test");
Assert.assertNotNull(mirrorQueue);
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
// Adding some PostAck event that will never be found on the target for an expiry
org.apache.activemq.artemis.api.core.Message message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3333L, AckReason.EXPIRED).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
// Adding some PostAck event that will never be found on the target for a regular ack
message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3334L, AckReason.NORMAL).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
Wait.assertEquals(0L, mirrorQueue::getMessageCount, 2000, 100);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224041"));
AssertionLoggerHandler.stopCapture();
server_2.stop();
server.stop();
}
@Test
public void testDeleteQueueWithRemoveFalse() throws Exception {
server.setIdentity("Server1");