ARTEMIS-4073 Page Counters can go off sync when multiple producers are used in the same address

This commit is contained in:
Clebert Suconic 2022-10-25 12:05:27 -04:00 committed by clebertsuconic
parent cffc06aec8
commit d185735e55
4 changed files with 132 additions and 50 deletions

View File

@ -43,12 +43,6 @@ public interface PageSubscriptionCounter {
*/ */
void processReload(); void processReload();
/**
* @param id
* @param variance
*/
void addInc(long id, int variance, long size);
// used when deleting the counter // used when deleting the counter
void delete() throws Exception; void delete() throws Exception;

View File

@ -20,10 +20,12 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
@ -78,6 +80,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
private LinkedList<PendingCounter> loadList; private LinkedList<PendingCounter> loadList;
private final Executor pageExecutor;
public PageSubscriptionCounterImpl(final StorageManager storage, public PageSubscriptionCounterImpl(final StorageManager storage,
final PageSubscription subscription, final PageSubscription subscription,
final boolean persistent, final boolean persistent,
@ -86,6 +90,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.storage = storage; this.storage = storage;
this.persistent = persistent; this.persistent = persistent;
this.subscription = subscription; this.subscription = subscription;
if (subscription == null) {
this.pageExecutor = null;
} else {
this.pageExecutor = subscription.getPagingStore().getExecutor();
assert pageExecutor != null;
}
} }
@Override @Override
@ -175,13 +185,23 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
} }
@Override @Override
public void increment(Transaction tx, int add, long size) throws Exception { public synchronized void increment(Transaction tx, int add, long size) throws Exception {
if (tx == null) { if (tx == null) {
if (persistent) { if (persistent) {
long id = storage.storePageCounterInc(this.subscriptionID, add, size); long id = storage.storePageCounterInc(this.subscriptionID, add, size);
incrementProcessed(id, add, size); storage.getContext().executeOnCompletion(new IOCallback() {
@Override
public void done() {
process(id, add, size);
}
@Override
public void onError(int errorCode, String errorMessage) {
}
});
} else { } else {
incrementProcessed(-1, add, size); process(-1, add, size);
} }
} else { } else {
if (persistent) { if (persistent) {
@ -227,12 +247,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
this.recordID = recordID1; this.recordID = recordID1;
} }
public synchronized void incrementProcessed(long id, int add, long size) { private void process(long id, int add, long size) {
addInc(id, add, size); if (id >= 0 && pageExecutor != null) {
if (incrementRecords.size() > FLUSH_COUNTER) { pageExecutor.execute(() -> doIncrement(id, add, size));
this.subscription.getPagingStore().execute(this::cleanup); } else {
doIncrement(-1, add, size);
} }
} }
@Override @Override
@ -295,8 +315,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
} }
} }
@Override // you need to call this method from the executors when id > 0
public synchronized void addInc(long id, int variance, long size) { private synchronized void doIncrement(long id, int variance, long size) {
value.addAndGet(variance); value.addAndGet(variance);
this.persistentSize.addAndGet(size); this.persistentSize.addAndGet(size);
if (variance > 0) { if (variance > 0) {
@ -307,6 +327,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
} }
if (id >= 0) { if (id >= 0) {
incrementRecords.add(id); incrementRecords.add(id);
if (incrementRecords.size() > FLUSH_COUNTER) {
this.cleanup();
}
} }
} }
@ -320,20 +343,15 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
/** /**
* This method should always be called from a single threaded executor * This method should always be called from a single threaded executor
*/ */
protected void cleanup() { protected synchronized void cleanup() {
ArrayList<Long> deleteList;
long valueReplace;
long sizeReplace;
synchronized (this) {
if (incrementRecords.size() <= FLUSH_COUNTER) { if (incrementRecords.size() <= FLUSH_COUNTER) {
return; return;
} }
valueReplace = value.get();
sizeReplace = persistentSize.get(); long valueReplace = value.get();
deleteList = new ArrayList<>(incrementRecords); long sizeReplace = persistentSize.get();
ArrayList<Long> deleteList = new ArrayList<>(incrementRecords);
incrementRecords.clear(); incrementRecords.clear();
}
long newRecordID = -1; long newRecordID = -1;
@ -394,7 +412,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override @Override
public void afterCommit(Transaction tx) { public void afterCommit(Transaction tx) {
for (ItemOper oper : operations) { for (ItemOper oper : operations) {
oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize); oper.counter.process(oper.id, oper.amount, oper.persistentSize);
} }
} }
} }

View File

@ -18,6 +18,14 @@ package org.apache.activemq.artemis.tests.integration.paging;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -37,14 +45,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PagingCounterTest extends ActiveMQTestBase { public class PagingCounterTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ActiveMQServer server; private ActiveMQServer server;
@ -86,15 +97,80 @@ public class PagingCounterTest extends ActiveMQTestBase {
counter.increment(tx, 1, 1000); counter.increment(tx, 1, 1000);
assertEquals(0, counter.getValue()); Wait.assertEquals(0, counter::getValue);
assertEquals(0, counter.getPersistentSize()); Wait.assertEquals(0, counter::getPersistentSize);
tx.commit(); tx.commit();
storage.waitOnOperations(); Wait.assertEquals(1, counter::getValue);
Wait.assertEquals(1000, counter::getPersistentSize);
} finally {
sf.close();
session.close();
}
}
@Test
public void testMultiThreadUpdates() throws Exception {
ClientSessionFactory sf = createSessionFactory(sl);
ClientSession session = sf.createSession();
AtomicInteger errors = new AtomicInteger(0);
try {
server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST));
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
final PageSubscriptionCounter counter = locateCounter(queue);
final int THREADS = 10;
final CyclicBarrier flagStart = new CyclicBarrier(THREADS);
final CountDownLatch done = new CountDownLatch(THREADS);
final int BUMPS = 2000;
Assert.assertEquals(0, counter.getValue());
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
runAfter(executorService::shutdownNow);
for (int i = 0; i < THREADS; i++) {
executorService.execute(() -> {
try {
flagStart.await(10, TimeUnit.SECONDS);
for (int repeat = 0; repeat < BUMPS; repeat++) {
counter.increment(null, 2, 1L);
Transaction tx = new TransactionImpl(server.getStorageManager());
counter.increment(tx, 1, 1L);
tx.commit();
counter.increment(null, -1, -1L);
tx = new TransactionImpl(server.getStorageManager());
counter.increment(tx, -1, -1L);
tx.commit();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
done.countDown();
}
});
}
// it should take a couple seconds only
done.await(1, TimeUnit.MINUTES);
Wait.assertEquals((long)(BUMPS * THREADS), counter::getValue, 5000, 100);
server.stop();
server.start();
queue = server.locateQueue("A1");
final PageSubscriptionCounter counterAfterRestart = locateCounter(queue);
Wait.assertEquals((long)(BUMPS * THREADS), counterAfterRestart::getValue, 5000, 100);
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
} finally { } finally {
sf.close(); sf.close();
session.close(); session.close();
@ -125,8 +201,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations(); storage.waitOnOperations();
assertEquals(i + 1, counter.getValue()); Wait.assertEquals(i + 1, counter::getValue);
assertEquals((i + 1) * 1000, counter.getPersistentSize()); Wait.assertEquals((i + 1) * 1000, counter::getPersistentSize);
tx = new TransactionImpl(server.getStorageManager()); tx = new TransactionImpl(server.getStorageManager());
} }
@ -134,10 +210,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
tx.commit(); tx.commit();
storage.waitOnOperations(); Wait.assertEquals(2100, counter::getValue);
Wait.assertEquals(2100 * 1000, counter::getPersistentSize);
assertEquals(2100, counter.getValue());
assertEquals(2100 * 1000, counter.getPersistentSize());
server.stop(); server.stop();
@ -240,10 +314,8 @@ public class PagingCounterTest extends ActiveMQTestBase {
tx.commit(); tx.commit();
storage.waitOnOperations(); Wait.assertEquals(1, counter::getValue);
Wait.assertEquals(1000, counter::getPersistentSize);
assertEquals(1, counter.getValue());
assertEquals(1000, counter.getPersistentSize());
sl.close(); sl.close();
@ -324,7 +396,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
storage.waitOnOperations(); storage.waitOnOperations();
assertEquals(2000, counter.getValue()); Wait.assertEquals(2000, counter::getValue);
} }

View File

@ -249,9 +249,7 @@ public class M_and_M_FactoryTest extends SoakTestBase {
}, 45_000, 1_000); }, 45_000, 1_000);
expectedTotalSize += BATCH_SIZE * 2; expectedTotalSize += BATCH_SIZE * 2;
Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded);
Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled());
retryNumber.incrementAndGet(); retryNumber.incrementAndGet();
for (Process c : consumers) { for (Process c : consumers) {