diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java index 33b744f606..75eedfa9ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java @@ -43,12 +43,6 @@ public interface PageSubscriptionCounter { */ void processReload(); - /** - * @param id - * @param variance - */ - void addInc(long id, int variance, long size); - // used when deleting the counter void delete() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index 0e7ab26dd0..07ca0373b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -20,10 +20,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -78,6 +80,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { private LinkedList loadList; + private final Executor pageExecutor; + public PageSubscriptionCounterImpl(final StorageManager storage, final PageSubscription subscription, final boolean persistent, @@ -86,6 +90,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { this.storage = storage; this.persistent = persistent; this.subscription = subscription; + if (subscription == null) { + this.pageExecutor = null; + } else { + this.pageExecutor = subscription.getPagingStore().getExecutor(); + assert pageExecutor != null; + } } @Override @@ -175,13 +185,23 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } @Override - public void increment(Transaction tx, int add, long size) throws Exception { + public synchronized void increment(Transaction tx, int add, long size) throws Exception { if (tx == null) { if (persistent) { long id = storage.storePageCounterInc(this.subscriptionID, add, size); - incrementProcessed(id, add, size); + storage.getContext().executeOnCompletion(new IOCallback() { + @Override + public void done() { + process(id, add, size); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }); } else { - incrementProcessed(-1, add, size); + process(-1, add, size); } } else { if (persistent) { @@ -227,12 +247,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { this.recordID = recordID1; } - public synchronized void incrementProcessed(long id, int add, long size) { - addInc(id, add, size); - if (incrementRecords.size() > FLUSH_COUNTER) { - this.subscription.getPagingStore().execute(this::cleanup); + private void process(long id, int add, long size) { + if (id >= 0 && pageExecutor != null) { + pageExecutor.execute(() -> doIncrement(id, add, size)); + } else { + doIncrement(-1, add, size); } - } @Override @@ -295,8 +315,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } } - @Override - public synchronized void addInc(long id, int variance, long size) { + // you need to call this method from the executors when id > 0 + private synchronized void doIncrement(long id, int variance, long size) { value.addAndGet(variance); this.persistentSize.addAndGet(size); if (variance > 0) { @@ -307,6 +327,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } if (id >= 0) { incrementRecords.add(id); + if (incrementRecords.size() > FLUSH_COUNTER) { + this.cleanup(); + } } } @@ -320,21 +343,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { /** * This method should always be called from a single threaded executor */ - protected void cleanup() { - ArrayList deleteList; - - long valueReplace; - long sizeReplace; - synchronized (this) { - if (incrementRecords.size() <= FLUSH_COUNTER) { - return; - } - valueReplace = value.get(); - sizeReplace = persistentSize.get(); - deleteList = new ArrayList<>(incrementRecords); - incrementRecords.clear(); + protected synchronized void cleanup() { + if (incrementRecords.size() <= FLUSH_COUNTER) { + return; } + long valueReplace = value.get(); + long sizeReplace = persistentSize.get(); + ArrayList deleteList = new ArrayList<>(incrementRecords); + incrementRecords.clear(); + long newRecordID = -1; long txCleanup = storage.generateID(); @@ -352,7 +370,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { if (logger.isTraceEnabled()) { logger.trace("Replacing page-counter record = {} by record = {} on subscriptionID = {} for queue = {}", - recordID, newRecordID, subscriptionID, subscription.getQueue().getName()); + recordID, newRecordID, subscriptionID, subscription.getQueue().getName()); } storage.commit(txCleanup); @@ -394,7 +412,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { @Override public void afterCommit(Transaction tx) { for (ItemOper oper : operations) { - oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize); + oper.counter.process(oper.id, oper.amount, oper.persistentSize); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java index 3dc6c10379..f68608e24a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java @@ -18,6 +18,14 @@ package org.apache.activemq.artemis.tests.integration.paging; import javax.transaction.xa.Xid; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -37,14 +45,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PagingCounterTest extends ActiveMQTestBase { - + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private ActiveMQServer server; @@ -86,15 +97,80 @@ public class PagingCounterTest extends ActiveMQTestBase { counter.increment(tx, 1, 1000); - assertEquals(0, counter.getValue()); - assertEquals(0, counter.getPersistentSize()); + Wait.assertEquals(0, counter::getValue); + Wait.assertEquals(0, counter::getPersistentSize); tx.commit(); - storage.waitOnOperations(); + Wait.assertEquals(1, counter::getValue); + Wait.assertEquals(1000, counter::getPersistentSize); + } finally { + sf.close(); + session.close(); + } + } + + @Test + public void testMultiThreadUpdates() throws Exception { + ClientSessionFactory sf = createSessionFactory(sl); + ClientSession session = sf.createSession(); + AtomicInteger errors = new AtomicInteger(0); + + try { + server.addAddressInfo(new AddressInfo(new SimpleString("A1"), RoutingType.ANYCAST)); + Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST)); + + final PageSubscriptionCounter counter = locateCounter(queue); + + final int THREADS = 10; + + final CyclicBarrier flagStart = new CyclicBarrier(THREADS); + final CountDownLatch done = new CountDownLatch(THREADS); + + final int BUMPS = 2000; + + Assert.assertEquals(0, counter.getValue()); + + ExecutorService executorService = Executors.newFixedThreadPool(THREADS); + runAfter(executorService::shutdownNow); + + for (int i = 0; i < THREADS; i++) { + executorService.execute(() -> { + try { + flagStart.await(10, TimeUnit.SECONDS); + for (int repeat = 0; repeat < BUMPS; repeat++) { + counter.increment(null, 2, 1L); + Transaction tx = new TransactionImpl(server.getStorageManager()); + counter.increment(tx, 1, 1L); + tx.commit(); + counter.increment(null, -1, -1L); + tx = new TransactionImpl(server.getStorageManager()); + counter.increment(tx, -1, -1L); + tx.commit(); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }); + } + + // it should take a couple seconds only + done.await(1, TimeUnit.MINUTES); + + Wait.assertEquals((long)(BUMPS * THREADS), counter::getValue, 5000, 100); + + server.stop(); + + server.start(); + + queue = server.locateQueue("A1"); + + final PageSubscriptionCounter counterAfterRestart = locateCounter(queue); + Wait.assertEquals((long)(BUMPS * THREADS), counterAfterRestart::getValue, 5000, 100); - assertEquals(1, counter.getValue()); - assertEquals(1000, counter.getPersistentSize()); } finally { sf.close(); session.close(); @@ -125,8 +201,8 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); - assertEquals(i + 1, counter.getValue()); - assertEquals((i + 1) * 1000, counter.getPersistentSize()); + Wait.assertEquals(i + 1, counter::getValue); + Wait.assertEquals((i + 1) * 1000, counter::getPersistentSize); tx = new TransactionImpl(server.getStorageManager()); } @@ -134,10 +210,8 @@ public class PagingCounterTest extends ActiveMQTestBase { tx.commit(); - storage.waitOnOperations(); - - assertEquals(2100, counter.getValue()); - assertEquals(2100 * 1000, counter.getPersistentSize()); + Wait.assertEquals(2100, counter::getValue); + Wait.assertEquals(2100 * 1000, counter::getPersistentSize); server.stop(); @@ -240,10 +314,8 @@ public class PagingCounterTest extends ActiveMQTestBase { tx.commit(); - storage.waitOnOperations(); - - assertEquals(1, counter.getValue()); - assertEquals(1000, counter.getPersistentSize()); + Wait.assertEquals(1, counter::getValue); + Wait.assertEquals(1000, counter::getPersistentSize); sl.close(); @@ -324,7 +396,7 @@ public class PagingCounterTest extends ActiveMQTestBase { storage.waitOnOperations(); - assertEquals(2000, counter.getValue()); + Wait.assertEquals(2000, counter::getValue); } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java index b400768eef..e0ccfab519 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/M_and_M_FactoryTest.java @@ -249,9 +249,7 @@ public class M_and_M_FactoryTest extends SoakTestBase { }, 45_000, 1_000); expectedTotalSize += BATCH_SIZE * 2; - Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded); - Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled()); retryNumber.incrementAndGet(); for (Process c : consumers) {