This commit is contained in:
Clebert Suconic 2019-07-15 15:13:35 -04:00
commit 5feb212efe
3 changed files with 89 additions and 2 deletions

View File

@ -259,8 +259,9 @@ public final class Page implements Comparable<Page> {
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
}
if (file.position() != fileSize) {
file.position(fileSize);
size.lazySet(processedBytes);
if (file.position() != processedBytes) {
file.position(processedBytes);
}
}
}

View File

@ -468,6 +468,15 @@ public class PagingStoreImpl implements PagingStore {
currentPage = page;
cursorProvider.addPageCache(pageCache);
/**
* The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
* In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten.
* Here we open a new page so the incomplete page would be reserved for recovery if needed.
*/
if (page.getSize() != page.getFile().size()) {
openNewPage();
}
}
// We will not mark it for paging if there's only a single empty file

View File

@ -726,6 +726,74 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
}
@Test
public void testWriteIncompletePage() throws Exception {
clearDataRecreateServerDirs();
SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1);
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
final int MAX_SIZE = 1024 * 1024;
AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
storeImpl.start();
Assert.assertEquals(0, storeImpl.getNumberOfPages());
// Marked the store to be paged
storeImpl.startPaging();
Page page = storeImpl.getCurrentPage();
int num1 = 20;
for (int i = 0; i < num1; i++) {
writePageMessage(storeImpl, i);
}
// simulate uncompleted page
long position = page.getFile().position();
writePageMessage(storeImpl, 30);
page.getFile().position(position);
ByteBuffer buffer = ByteBuffer.allocate(10);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put((byte) 'Z');
}
buffer.rewind();
page.getFile().writeDirect(buffer, true);
storeImpl.stop();
// write uncompleted page
storeImpl.start();
int num2 = 10;
for (int i = 0; i < num2; i++) {
writePageMessage(storeImpl, i + num1);
}
// simulate broker restart
storeImpl.stop();
storeImpl.start();
long msgsRead = 0;
while (msgsRead < num1 + num2) {
page = storeImpl.depage();
assertNotNull("no page after read " + msgsRead + " msg", page);
page.open();
List<PagedMessage> messages = page.read(new NullStorageManager());
for (PagedMessage pgmsg : messages) {
Message msg = pgmsg.getMessage();
Assert.assertEquals(msgsRead, msg.getMessageID());
Assert.assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
msgsRead++;
}
}
storeImpl.stop();
}
/**
* @return
*/
@ -747,6 +815,15 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
};
}
protected void writePageMessage(final PagingStore storeImpl,
final long id) throws Exception {
Message msg = createMessage(id, storeImpl, PagingStoreImplTest.destinationTestName, createRandomBuffer(id, 10));
msg.putLongProperty("count", id);
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock);
}
private CoreMessage createMessage(final long id,
final PagingStore store,
final SimpleString destination,