ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror

co-authored Clebert Suconic
This commit is contained in:
iliya 2022-06-30 14:40:46 -04:00 committed by clebertsuconic
parent 2f361b1d40
commit d90179b99c
4 changed files with 344 additions and 22 deletions

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io;
import org.jboss.logging.Logger;
public class RunnableCallback implements IOCallback {
private static final Logger logger = Logger.getLogger(RunnableCallback.class);
Runnable okCallback;
Runnable errorCallback;
public RunnableCallback(Runnable ok, Runnable error) {
if (ok == null) {
throw new NullPointerException("ok = null");
}
if (ok == null) {
throw new NullPointerException("error = null");
}
okCallback = ok;
errorCallback = error;
}
public RunnableCallback(Runnable ok) {
if (ok == null) {
throw new NullPointerException("ok = null");
}
okCallback = ok;
errorCallback = ok;
}
@Override
public void done() {
try {
okCallback.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
errorCallback.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}

View File

@ -177,6 +177,10 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
public OperationContext getSessionContext() {
return serverSession.getSessionContext();
}
@Override
public void browserFinished(ServerConsumer consumer) {

View File

@ -26,7 +26,9 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.RunnableCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -138,7 +140,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
// in a regular case we should not have more than amqpCredits on the pool, that's the max we would need
private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new ACKMessageOperation());
private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, ACKMessageOperation::new);
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
@ -151,6 +153,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private final ReferenceNodeStore referenceNodeStore;
OperationContext mirrorContext;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
@ -161,6 +165,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
this.basicController.setLink(receiver);
this.server = server;
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
}
@Override
@ -224,7 +229,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
} else if (eventType.equals(POST_ACK)) {
String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message);
@ -236,9 +240,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
AmqpValue value = (AmqpValue) message.getBody();
Long messageID = (Long) value.getValue();
if (logger.isDebugEnabled()) {
logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
}
if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) {
if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
messageAckOperation = null;
}
}
@ -336,7 +340,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
}
public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
final Queue targetQueue = server.locateQueue(queue);
if (targetQueue == null) {
@ -355,32 +359,50 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
}
performAck(nodeID, messageID, targetQueue, ackMessage, reason, true);
performAck(nodeID, messageID, targetQueue, ackMessage, reason, (short)0);
return true;
}
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) {
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null && retry) {
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID + ", currentRetry=" + retry);
}
switch (retry) {
case 0:
// first retry, after IO Operations
sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short) 1)));
return;
case 1:
// second retry after the queue is flushed the temporary adds
targetQueue.flushOnIntermediate(() -> {
recoverContext();
performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short)2);
});
return;
case 2:
// third retry, on paging
if (reason != AckReason.EXPIRED) {
// if expired, we don't need to check on paging
// as the message will expire again when depaged (if on paging)
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
return;
} else {
ackMessageOperation.run();
}
}
targetQueue.flushOnIntermediate(() -> {
recoverContext();
performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false);
});
return;
}
if (reference != null) {
if (logger.isTraceEnabled()) {
logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
@ -398,14 +420,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} else {
if (reason != AckReason.EXPIRED) {
// if expired, we don't need to check on paging
// as the message will expire again when depaged (if on paging)
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
}
}
}
/**

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class AMQPMirrorFastACKTest extends AmqpClientTestSupport {
private static final String SLOW_SERVER_NAME = "slow";
private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
private static final int ENCODE_DELAY = 10;
private ActiveMQServer slowServer;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
slowServer = createSlowServer();
}
@Override
@After
public void tearDown() throws Exception {
try {
if (slowServer != null) {
slowServer.stop();
}
} finally {
super.tearDown();
}
}
@Test
public void testMirrorTargetFastACK() throws Exception {
final int NUMBER_OF_MESSAGES = 10;
CountDownLatch done = new CountDownLatch(NUMBER_OF_MESSAGES);
AMQPMirrorBrokerConnectionElement replication = configureMirrorTowardsSlow(server);
slowServer.start();
server.start();
waitForServerToStart(slowServer);
waitForServerToStart(server);
server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
connection.start();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
message.acknowledge();
done.countDown();
} catch (Exception ignore) {
// Ignore
}
}
});
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
producer.send(session.createTextMessage("i=" + i));
}
Assert.assertTrue(done.await(5000, TimeUnit.MILLISECONDS));
}
Queue snf = server.locateQueue(replication.getMirrorSNF());
Queue queue = slowServer.locateQueue(getQueueName());
Wait.waitFor(() -> snf.getMessageCount() == 0 && snf.getMessagesAdded() > NUMBER_OF_MESSAGES);
Wait.assertTrue("Expected mirrored target queue " + getQueueName() + " to be empty", () -> queue.getMessageCount() == 0 && queue.getMessagesAdded() == NUMBER_OF_MESSAGES);
}
@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
}
private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer source) {
AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true);
connection.addElement(replication);
source.getConfiguration().addAMQPConnection(connection);
return replication;
}
private ActiveMQServer createSlowServer() throws Exception {
ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(SLOW_SERVER_PORT), mBeanServer, securityManager) {
@Override
protected StorageManager createStorageManager() {
return AMQPMirrorFastACKTest.this.createStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
}
};
server.getConfiguration().setName(SLOW_SERVER_NAME);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, SLOW_SERVER_PORT));
server.getConfiguration().setJMXManagementEnabled(true);
server.getConfiguration().setMessageExpiryScanPeriod(100);
configureAddressPolicy(server);
configureBrokerSecurity(server);
return server;
}
private StorageManager createStorageManager(Configuration configuration,
CriticalAnalyzer criticalAnalyzer,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledPool,
ExecutorFactory ioExecutorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
@Override
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
Persister persister,
Object record) throws Exception {
super.appendAddRecordTransactional(txID, id, recordType, record instanceof AMQPStandardMessage ? new SlowMessagePersister<>(persister) : persister, record);
}
};
}
};
}
static class SlowMessagePersister<T> implements Persister<T> {
private final Persister<T> delegate;
SlowMessagePersister(Persister<T> delegate) {
this.delegate = delegate;
}
@Override
public byte getID() {
return delegate.getID();
}
@Override
public int getEncodeSize(T record) {
return delegate.getEncodeSize(record);
}
@Override
public void encode(ActiveMQBuffer buffer, T record) {
try {
// This will slow down IO completion for transactional message write
Thread.sleep(ENCODE_DELAY);
} catch (Exception ignore) {
// ignore
}
delegate.encode(buffer, record);
}
@Override
public T decode(ActiveMQBuffer buffer, T record, CoreMessageObjectPools pool) {
return delegate.decode(buffer, record, pool);
}
}
}