ARTEMIS-3464 Protecting scheduled tasks counter with a synchronized

This commit is contained in:
Clebert Suconic 2021-09-13 09:31:30 -04:00 committed by clebertsuconic
parent c67441664f
commit 14457c4308
4 changed files with 20 additions and 33 deletions

View File

@ -359,8 +359,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck, pageAck);
targetQueue.getPageSubscription().performScanAck();
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {

View File

@ -89,19 +89,13 @@ public interface PageSubscription {
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
/**
* Add a scanFunction represented by a ToIntFunction
* the execution will be done when you call {@link #performScanAck()}
* This method will schedule scanning over Paging, however a retry should be done before the scanning.
* @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found.
* @param scanFunction
* @param found
* @param notFound
*/
void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);
/**
* It will perform all scans added by {@link #addScanAck(BooleanSupplier, ToIntFunction, Runnable, Runnable)}
*/
void performScanAck();
void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);
/**

View File

@ -114,8 +114,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
/** this variable governs if we need to schedule another runner to look after the scanList. */
private boolean pageScanNeeded = true;
private final LinkedList<PageScan> scanList = new LinkedList();
private static class PageScan {
@ -145,29 +146,25 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
@Override
public void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
public void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound);
boolean pageScanNeededLocal;
synchronized (scanList) {
scanList.add(scan);
pageScanNeededLocal = this.pageScanNeeded;
this.pageScanNeeded = false;
}
if (pageScanNeededLocal) {
executor.execute(this::performScanAck);
}
}
@Override
public void performScanAck() {
// we should only have a max of 2 scheduled tasks
// one that's might still be currently running, and another one lined up
// no need for more than that
if (scheduledScanCount.incrementAndGet() <= 2) {
executor.execute(this::actualScanAck);
} else {
scheduledScanCount.decrementAndGet();
}
}
private void actualScanAck() {
private void performScanAck() {
try {
PageScan[] localScanList;
synchronized (scanList) {
this.pageScanNeeded = true;
if (scanList.size() == 0) {
return;
}
@ -273,8 +270,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
scheduledScanCount.decrementAndGet();
}
}

View File

@ -135,14 +135,13 @@ public class PageAckScanTest extends ActiveMQTestBase {
AtomicInteger retried = new AtomicInteger(0);
PageSubscription subscription = queue.getPageSubscription();
subscription.addScanAck(() -> false, new CompareI(15), done, notFound);
subscription.addScanAck(() -> false, new CompareI(11), done, notFound);
subscription.addScanAck(() -> false, new CompareI(99), done, notFound);
subscription.addScanAck(() -> false, new CompareI(-30), done, notFound);
subscription.addScanAck(() -> {
subscription.scanAck(() -> false, new CompareI(15), done, notFound);
subscription.scanAck(() -> false, new CompareI(11), done, notFound);
subscription.scanAck(() -> false, new CompareI(99), done, notFound);
subscription.scanAck(() -> false, new CompareI(-30), done, notFound);
subscription.scanAck(() -> {
retried.incrementAndGet();
return true;}, new CompareI(333), done, notFound);
subscription.performScanAck();
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
Assert.assertEquals(2, errors.get());
Wait.assertEquals(1, retried::get);