ARTEMIS-3318 Invalid data on page should not hold page cleanup

This commit is contained in:
Clebert Suconic 2021-05-26 11:35:16 -04:00 committed by clebertsuconic
parent 13121cd4a3
commit cfee2035c8
3 changed files with 156 additions and 12 deletions

View File

@ -753,8 +753,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
for (PagePosition pos : recoveredACK) {
lastAckedPosition = pos;
PageCursorInfo pageInfo = getPageInfo(pos);
PageCache cache = null;
if (pageInfo == null) {
if (pageInfo != null) {
cache = pageInfo.getValidCache();
}
if (cache == null || pos.getMessageNr() >= 0 && cache.getMessage(pos) == null) {
ActiveMQServerLogger.LOGGER.pageNotFound(pos);
if (txDeleteCursorOnReload == -1) {
txDeleteCursorOnReload = store.generateID();
@ -1179,15 +1184,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
private int getNumberOfMessagesFromPageCache() {
// if the page was live at any point, we need to
// get the number of messages from the page-cache
PageCache localCache = this.cache.get();
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
// this could happen if the file does not exist any more, after cleanup
if (localCache == null) {
return 0;
}
this.cache = new WeakReference<>(localCache);
}
PageCache localCache = getValidCache();
if (localCache == null) return 0;
int numberOfMessage = localCache.getNumberOfMessages();
if (!localCache.isLive()) {
//to avoid further "live" queries
@ -1196,6 +1194,19 @@ public final class PageSubscriptionImpl implements PageSubscription {
return numberOfMessage;
}
public PageCache getValidCache() {
PageCache localCache = this.cache.get();
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
// this could happen if the file does not exist any more, after cleanup
if (localCache == null) {
return null;
}
this.cache = new WeakReference<>(localCache);
}
return localCache;
}
private int getNumberOfMessages() {
final int numberOfMessages = this.numberOfMessages;
if (wasLive) {

View File

@ -571,7 +571,7 @@ public class PagingStoreImpl implements PagingStore {
try {
factory = checkFileFactory();
SequentialFile file = factory.createSequentialFile(fileName);
return file.exists();
return file.exists() && file.size() > 0;
} catch (Exception ignored) {
logger.debug("PagingStoreFactory::checkPageFileExists never-throws assumption failed.", ignored);
return false;
@ -1141,7 +1141,7 @@ public class PagingStoreImpl implements PagingStore {
* @param pageID
* @return
*/
private String createFileName(final int pageID) {
public String createFileName(final int pageID) {
/** {@link DecimalFormat} is not thread safe. */
synchronized (format) {
return format.format(pageID) + ".page";

View File

@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@ -80,6 +81,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
@ -576,6 +578,137 @@ public class PagingTest extends ActiveMQTestBase {
Assert.assertEquals(numberOfMessages * 2, removedMessages);
}
@Test
public void testPageCleanupWithInvalidData() throws Exception {
Assume.assumeTrue(storeType != StoreConfiguration.StoreType.DATABASE);
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
final int numberOfMessages = 100;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
byte[] body = new byte[10];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= 10; j++) {
bb.put(getSamplebyte(j));
}
Queue queue = server.locateQueue(ADDRESS);
queue.getPagingStore().startPaging();
queue.getPagingStore().forceAnotherPage(); // forcing an empty file, just to make it more challenging
for (int i = 0; i < numberOfMessages; i++) {
if (i % 10 == 0 && i > 0) {
queue.getPagingStore().forceAnotherPage();
}
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
message.putIntProperty("i", i);
producer.send(message);
}
queue.getPagingStore().getCursorProvider().disableCleanup();
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
for (int i = 0; i < 11; i++) {
ClientMessage msgRec = consumer.receive(1000);
Assert.assertNotNull(msgRec);
msgRec.acknowledge();
}
session.commit();
consumer.close();
consumer = session.createConsumer(ADDRESS, SimpleString.toSimpleString("i=29"));
message = consumer.receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
session.commit();
File folder = queue.getPagingStore().getFolder();
// We will truncate two files
for (int f = 2; f <= 3; f++) {
String fileName = ((PagingStoreImpl)queue.getPagingStore()).createFileName(f);
File file = new File(folder, fileName);
file.delete();
file.createNewFile();
}
sf.close();
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(5000);
Page page4 = queue.getPagingStore().createPage(4);
page4.open();
List<PagedMessage> messagesRead = page4.read(server.getStorageManager());
Assert.assertEquals(10, messagesRead.size());
page4.close(false);
page4.delete(null);
page4.open();
for (int i = 0; i < 9; i++) {
page4.write(messagesRead.get(i)); // this will make message 29 disappear
}
page4.close(false);
server.stop();
server.start();
queue = server.locateQueue(ADDRESS);
Assert.assertTrue(queue.getPagingStore().isPaging());
queue.getPageSubscription().enableAutoCleanup(); // this should been true already as the server was restarted, just braces and belts
sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
consumer = session.createConsumer(ADDRESS);
session.start();
for (int i = 20; i < numberOfMessages; i++) { // I made one message disappear on page 4
if (i != 29) { // I made message 29 disappear
ClientMessage msgClient = consumer.receive(1000);
Assert.assertNotNull(msgClient);
Assert.assertEquals(i, msgClient.getIntProperty("i").intValue());
msgClient.acknowledge();
}
}
ClientMessage msgClient = consumer.receiveImmediate();
Assert.assertNull(msgClient);
session.commit();
Wait.assertFalse(queue.getPagingStore()::isPaging);
}
@Test
public void testQueueRetryMessages() throws Exception {
clearDataRecreateServerDirs();