ARTEMIS-4421 Page counters should work before page rebuild is done
This commit is contained in:
parent
4b8c7199e7
commit
263a44e262
|
@ -1234,6 +1234,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
if (sub != null) {
|
||||
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
|
||||
if (encoding.getValue() > 0) {
|
||||
sub.notEmpty();
|
||||
}
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
|
||||
messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
|
||||
|
|
|
@ -16,8 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.paging;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.transaction.xa.Xid;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
@ -27,6 +30,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
|
@ -37,13 +41,13 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -481,6 +485,45 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSendNoRebuild() throws Exception {
|
||||
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
queue.getPagingStore().startPaging();
|
||||
|
||||
PageSubscriptionCounter counter = locateCounter(queue);
|
||||
|
||||
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
try (Connection connection = cf.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = session.createProducer(session.createQueue("A1"));
|
||||
for (int i = 0; i < 3000; i++) {
|
||||
producer.send(session.createTextMessage("i" + i));
|
||||
}
|
||||
session.commit();
|
||||
}
|
||||
|
||||
server.stop();
|
||||
|
||||
server = newActiveMQServer();
|
||||
|
||||
server.setRebuildCounters(false);
|
||||
|
||||
server.start();
|
||||
|
||||
queue = server.locateQueue(new SimpleString("A1"));
|
||||
|
||||
assertNotNull(queue);
|
||||
|
||||
counter = locateCounter(queue);
|
||||
|
||||
logger.debug("Counter:: {}", queue.getMessageCount());
|
||||
|
||||
Wait.assertEquals(3000, counter::getValue);
|
||||
Wait.assertEquals(3000L, queue::getMessageCount, 1000, 100);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -495,9 +538,9 @@ public class PagingCounterTest extends ActiveMQTestBase {
|
|||
|
||||
OperationContextImpl.clearContext();
|
||||
|
||||
ActiveMQServer server = super.createServer(true, false);
|
||||
ActiveMQServer server = super.createServer(true, true);
|
||||
|
||||
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
|
||||
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageMessages(10);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
|
||||
|
||||
|
|
Loading…
Reference in New Issue