ARTEIS-4651 Speeding up delivery resume when SNF is paged

This commit is contained in:
Clebert Suconic 2024-03-01 18:20:26 -05:00 committed by clebertsuconic
parent f9f5ed373f
commit b5791344b4
6 changed files with 76 additions and 12 deletions

View File

@ -477,6 +477,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
Queue queue = checkCurrentMirror(this, (AMQPMirrorControllerSource) currentMirrorController);
// on this case we already had a mirror installed before, we won't duplicate it
if (queue != null) {
queue.deliverAsync();
return queue;
}
} else if (currentMirrorController != null && currentMirrorController instanceof AMQPMirrorControllerAggregation) {

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -111,6 +112,45 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
boolean started;
TransactionOperation deliveryAsyncTX = new TransactionOperation() {
@Override
public void beforePrepare(Transaction tx) throws Exception {
}
@Override
public void afterPrepare(Transaction tx) {
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
}
@Override
public void afterCommit(Transaction tx) {
snfQueue.deliverAsync();
}
@Override
public void beforeRollback(Transaction tx) throws Exception {
}
@Override
public void afterRollback(Transaction tx) {
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
@Override
public List<MessageReference> getListOnConsumer(long consumerID) {
return null;
}
};
@Override
public void start() throws Exception {
}
@ -311,6 +351,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// This will store the message on paging, and the message will be copied into paging.
if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, this::copyMessageForPaging)) {
if (tx == null) {
snfQueue.deliverAsync();
} else {
if (tx.getProperty(TransactionPropertyIndexes.MIRROR_DELIVERY_ASYNC) == null) {
tx.putProperty(TransactionPropertyIndexes.MIRROR_DELIVERY_ASYNC, deliveryAsyncTX);
tx.addOperation(deliveryAsyncTX);
}
}
return;
}
@ -449,6 +497,10 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (sync) {
syncDone(reference);
}
if (reference != null && reference.getQueue() != null && reference.isPaged()) {
reference.getQueue().deliverAsync();
}
}
@Override

View File

@ -406,6 +406,8 @@ public class AckManager implements ActiveMQComponent {
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
targetQueue.deliverAsync();
}
}
/** The ACKManager will perform the retry on each address's pageStore executor.

View File

@ -1304,7 +1304,14 @@ public final class PageSubscriptionImpl implements PageSubscription {
if (valid && message.getPagedMessage().getTransactionID() >= 0) {
PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage().getTransactionID());
if (tx == null) {
ActiveMQServerLogger.LOGGER.pageSubscriptionCouldntLoad(message.getPagedMessage().getTransactionID(), message.getPagedMessage().newPositionObject(), pageStore.getAddress(), queue.getName());
if (logger.isDebugEnabled()) {
// this message used to be a warning...
// when this message was first introduced I was being over carefully about eventually hiding a bug. Over the years I am confident
// this could happen between restarts.
// also after adding rebuild counters and retry scans over mirroring this could happen more oftenly.
// It's time to make this a logger.debug now
logger.debug("Could not locate page transaction {}, ignoring message on position {} on address={} queue={}", message.getPagedMessage().getTransactionID(), message.getPagedMessage().newPositionObject(), pageStore.getAddress(), queue.getName());
}
valid = false;
ignored = true;
} else {

View File

@ -41,4 +41,6 @@ public class TransactionPropertyIndexes {
public static final int MIRROR_ACK_OPERATION = 11;
public static final int MIRROR_SEND_OPERATION = 12;
public static final int MIRROR_DELIVERY_ASYNC = 13;
}

View File

@ -169,8 +169,8 @@ public class PagedSNFSoakTest extends SoakTestBase {
Wait.assertEquals((long) numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
Wait.assertEquals((long) 0, () -> getCount("DC1", simpleManagementDC1A, SNF_QUEUE), 50_000, 100);
Wait.assertEquals((long) numberOfMessages, () -> getCount("DC2", simpleManagementDC2A, QUEUE_NAME), 30_000, 100);
Wait.assertEquals((long) 0, () -> getCount("DC1", simpleManagementDC1A, SNF_QUEUE), 5_000, 100);
Wait.assertEquals((long) numberOfMessages, () -> getCount("DC2", simpleManagementDC2A, QUEUE_NAME), 5_000, 100);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
@ -207,7 +207,7 @@ public class PagedSNFSoakTest extends SoakTestBase {
int serverToProduce = ((nbatch & 1) > 0) ? 1 : 0;
int serverToConsume = ((nbatch & 2) > 0) ? 1 : 0;
logger.info("Batch {}, sending on server {}. consuming on server {}", nbatch, serverToProduce, serverToConsume);
logger.debug("Batch {}, sending on server {}. consuming on server {}", nbatch, serverToProduce, serverToConsume);
try (Connection connection = cfs[serverToProduce].createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@ -215,7 +215,7 @@ public class PagedSNFSoakTest extends SoakTestBase {
for (int i = 0; i < numberOfMessages; i++) {
producer.send(session.createTextMessage("msg " + i));
if (i > 0 && i % batchSize == 0) {
logger.info("Commit send {}", i);
logger.debug("Commit send {}", i);
session.commit();
}
}
@ -223,9 +223,9 @@ public class PagedSNFSoakTest extends SoakTestBase {
}
for (SimpleManagement s : sm) {
logger.info("Checking counts on SNF for {}", s.getUri());
logger.debug("Checking counts on SNF for {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(SNF_QUEUE), 120_000, 100);
logger.info("Checking counts on {} on {}", QUEUE_NAME, s.getUri());
logger.debug("Checking counts on {} on {}", QUEUE_NAME, s.getUri());
Wait.assertEquals((long) numberOfMessages, () -> s.getMessageCountOnQueue(QUEUE_NAME), 60_000, 100);
}
@ -237,7 +237,7 @@ public class PagedSNFSoakTest extends SoakTestBase {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
if (i > 0 && i % batchSize == 0) {
logger.info("Commit consume {}", i);
logger.debug("Commit consume {}", i);
session.commit();
}
}
@ -245,9 +245,9 @@ public class PagedSNFSoakTest extends SoakTestBase {
}
for (SimpleManagement s : sm) {
logger.info("Checking 0 counts on SNF for {}", s.getUri());
logger.debug("Checking 0 counts on SNF for {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(SNF_QUEUE), 120_000, 100);
logger.info("Checking for empty queue on {}", s.getUri());
logger.debug("Checking for empty queue on {}", s.getUri());
Wait.assertEquals((long) 0, () -> s.getMessageCountOnQueue(QUEUE_NAME), 60_000, 100);
}
}
@ -261,7 +261,7 @@ public class PagedSNFSoakTest extends SoakTestBase {
Assert.assertNotNull(message);
Assert.assertEquals(body, message.getText());
Assert.assertEquals(i, message.getIntProperty("id"));
logger.info("received {}", i);
logger.debug("received {}", i);
if ((i + 1) % 10 == 0) {
session.commit();
}
@ -282,7 +282,7 @@ public class PagedSNFSoakTest extends SoakTestBase {
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = session.createTextMessage(body);
message.setIntProperty("id", i);
logger.info("send {}", i);
logger.debug("send {}", i);
if (setter != null) {
try {
setter.accept(message);