diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 96ed2bad67..5f98b4c76f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -259,8 +259,9 @@ public final class Page implements Comparable { if (fileBuffer != null) { fileFactory.releaseBuffer(fileBuffer); } - if (file.position() != fileSize) { - file.position(fileSize); + size.lazySet(processedBytes); + if (file.position() != processedBytes) { + file.position(processedBytes); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 7853b81d90..96e1817a7f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -468,6 +468,15 @@ public class PagingStoreImpl implements PagingStore { currentPage = page; cursorProvider.addPageCache(pageCache); + + /** + * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged. + * In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten. + * Here we open a new page so the incomplete page would be reserved for recovery if needed. + */ + if (page.getSize() != page.getFile().size()) { + openNewPage(); + } } // We will not mark it for paging if there's only a single empty file diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 90725a21d1..9cc23a144e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -726,6 +726,74 @@ public class PagingStoreImplTest extends ActiveMQTestBase { } } + @Test + public void testWriteIncompletePage() throws Exception { + clearDataRecreateServerDirs(); + SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1); + + PagingStoreFactory storeFactory = new FakeStoreFactory(factory); + + final int MAX_SIZE = 1024 * 1024; + + AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + + final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true); + + storeImpl.start(); + + Assert.assertEquals(0, storeImpl.getNumberOfPages()); + + // Marked the store to be paged + storeImpl.startPaging(); + + Page page = storeImpl.getCurrentPage(); + + int num1 = 20; + for (int i = 0; i < num1; i++) { + writePageMessage(storeImpl, i); + } + // simulate uncompleted page + long position = page.getFile().position(); + writePageMessage(storeImpl, 30); + page.getFile().position(position); + ByteBuffer buffer = ByteBuffer.allocate(10); + for (int i = 0; i < buffer.capacity(); i++) { + buffer.put((byte) 'Z'); + } + buffer.rewind(); + page.getFile().writeDirect(buffer, true); + storeImpl.stop(); + + // write uncompleted page + storeImpl.start(); + int num2 = 10; + for (int i = 0; i < num2; i++) { + writePageMessage(storeImpl, i + num1); + } + + // simulate broker restart + storeImpl.stop(); + storeImpl.start(); + + long msgsRead = 0; + + while (msgsRead < num1 + num2) { + page = storeImpl.depage(); + assertNotNull("no page after read " + msgsRead + " msg", page); + page.open(); + List messages = page.read(new NullStorageManager()); + + for (PagedMessage pgmsg : messages) { + Message msg = pgmsg.getMessage(); + Assert.assertEquals(msgsRead, msg.getMessageID()); + Assert.assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue()); + msgsRead++; + } + } + + storeImpl.stop(); + } + /** * @return */ @@ -747,6 +815,15 @@ public class PagingStoreImplTest extends ActiveMQTestBase { }; } + protected void writePageMessage(final PagingStore storeImpl, + final long id) throws Exception { + Message msg = createMessage(id, storeImpl, PagingStoreImplTest.destinationTestName, createRandomBuffer(id, 10)); + msg.putLongProperty("count", id); + + final RoutingContextImpl ctx2 = new RoutingContextImpl(null); + storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock); + } + private CoreMessage createMessage(final long id, final PagingStore store, final SimpleString destination,