diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 23800a5577..efa6d2832c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1234,6 +1234,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (sub != null) { sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize()); + if (encoding.getValue() > 0) { + sub.notEmpty(); + } } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java index 97b86f7e11..298749c66a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java @@ -16,8 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.paging; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.transaction.xa.Xid; - import java.lang.invoke.MethodHandles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; @@ -37,13 +41,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; import org.junit.Assert; @@ -481,6 +485,45 @@ public class PagingCounterTest extends ActiveMQTestBase { } + + @Test + public void testSendNoRebuild() throws Exception { + Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST)); + + queue.getPagingStore().startPaging(); + + PageSubscriptionCounter counter = locateCounter(queue); + + ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue("A1")); + for (int i = 0; i < 3000; i++) { + producer.send(session.createTextMessage("i" + i)); + } + session.commit(); + } + + server.stop(); + + server = newActiveMQServer(); + + server.setRebuildCounters(false); + + server.start(); + + queue = server.locateQueue(new SimpleString("A1")); + + assertNotNull(queue); + + counter = locateCounter(queue); + + logger.debug("Counter:: {}", queue.getMessageCount()); + + Wait.assertEquals(3000, counter::getValue); + Wait.assertEquals(3000L, queue::getMessageCount, 1000, 100); + } + @Override @Before public void setUp() throws Exception { @@ -495,9 +538,9 @@ public class PagingCounterTest extends ActiveMQTestBase { OperationContextImpl.clearContext(); - ActiveMQServer server = super.createServer(true, false); + ActiveMQServer server = super.createServer(true, true); - AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024); + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageMessages(10); server.getAddressSettingsRepository().addMatch("#", defaultSetting);