ARTEMIS-3425 Possible NPE on Page reload

This commit is contained in:
Clebert Suconic 2021-08-18 17:37:43 -04:00 committed by clebertsuconic
parent 656114045a
commit 3edb96b09b
2 changed files with 66 additions and 1 deletions

View File

@ -1195,7 +1195,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
public PageCache getValidCache() {
PageCache localCache = this.cache.get();
PageCache localCache = this.cache != null ? this.cache.get() : null;
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
// this could happen if the file does not exist any more, after cleanup

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -105,6 +106,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RetryRule;
@ -494,6 +496,69 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
}
@Test
public void testPageReload() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultConfig(true).setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
server.addAddressInfo(new AddressInfo(getName()).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST));
Queue serverQueue = server.locateQueue(getName());
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(getName());
serverQueue.getPagingStore().startPaging();
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
producer.send(session.createMessage());
}
serverQueue.getPagingStore().forceAnotherPage();
}
}
// Forcing a situation in the data that would cause an issue while reloading the data
long tx = server.getStorageManager().generateID();
server.getStorageManager().storePageCompleteTransactional(tx, serverQueue.getID(), new PagePositionImpl(1, 10));
server.getStorageManager().commit(tx);
server.getStorageManager().storeCursorAcknowledge(serverQueue.getID(), new PagePositionImpl(1, 0));
server.stop();
server.start();
Queue serverQueueAfterRestart = server.locateQueue(getName());
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(getName());
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 90; i++) {
javax.jms.Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
Wait.assertFalse(serverQueueAfterRestart.getPagingStore()::isPaging);
}
}
@Test
public void testQueueRemoveAll() throws Exception {
clearDataRecreateServerDirs();