mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-22 02:05:13 +00:00
[ARTEMIS-1986] PagingTest#testDeletePhysicalPages will fail if a record about deleting a page is not saved in journal
This commit is contained in:
parent
e2183b33a3
commit
993499daaf
@ -1585,6 +1585,154 @@ public class PagingTest extends ActiveMQTestBase {
|
||||
|
||||
}
|
||||
|
||||
// 4 messages are send/received, it creates 2 pages, where for second page there is no delete completion record in journal
|
||||
// server is restarted and 4 messages sent/received again. There should be no lost message.
|
||||
@Test
|
||||
public void testRestartWithCompleteAndDeletedPhysicalPage() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig();
|
||||
|
||||
final AtomicBoolean mainCleanup = new AtomicBoolean(true);
|
||||
|
||||
class InterruptedCursorProvider extends PageCursorProviderImpl {
|
||||
|
||||
InterruptedCursorProvider(PagingStore pagingStore,
|
||||
StorageManager storageManager,
|
||||
ArtemisExecutor executor,
|
||||
int maxCacheSize) {
|
||||
super(pagingStore, storageManager, executor, maxCacheSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
if (mainCleanup.get()) {
|
||||
super.cleanup();
|
||||
} else {
|
||||
try {
|
||||
pagingStore.unlock();
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
|
||||
@Override
|
||||
protected PagingStoreFactoryNIO getPagingStoreFactory() {
|
||||
return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
|
||||
@Override
|
||||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
ArtemisExecutor executor) {
|
||||
return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
addServer(server);
|
||||
|
||||
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(MESSAGE_SIZE).
|
||||
setMaxSizeBytes(2 * MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
|
||||
|
||||
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
|
||||
|
||||
server.start();
|
||||
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(true, true, 0);
|
||||
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
|
||||
|
||||
Queue queue = server.locateQueue(ADDRESS);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
ClientMessage message;
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);
|
||||
|
||||
producer.send(message);
|
||||
session.commit();
|
||||
|
||||
//last page (#2, whch contains only message #3) is marked as complete - is full - but no delete complete record is added
|
||||
if (i == 3) {
|
||||
queue.getPageSubscription().getPagingStore().forceAnotherPage();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Assert.assertEquals(3, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS);
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
message = consumer.receive(5000);
|
||||
Assert.assertNotNull("Before restart - message " + i + " is empty.",message);
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
|
||||
|
||||
server.stop();
|
||||
mainCleanup.set(false);
|
||||
|
||||
|
||||
|
||||
// Deleting the paging data. Simulating a failure
|
||||
// a dumb user, or anything that will remove the data
|
||||
deleteDirectory(new File(getPageDir()));
|
||||
|
||||
logger.trace("Server restart");
|
||||
|
||||
server.start();
|
||||
|
||||
locator = createInVMNonHALocator();
|
||||
sf = createSessionFactory(locator);
|
||||
session = sf.createSession(null, null, false, false, true, false, 0);
|
||||
producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);
|
||||
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
session.commit();
|
||||
|
||||
mainCleanup.set(true);
|
||||
|
||||
queue = server.locateQueue(ADDRESS);
|
||||
queue.getPageSubscription().cleanupEntries(false);
|
||||
queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
|
||||
|
||||
consumer = session.createConsumer(ADDRESS);
|
||||
session.start();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
message = consumer.receive(5000);
|
||||
Assert.assertNotNull("After restart - message " + i + " is empty.",message);
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
server.stop();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingTXEverythingAcked() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
Loading…
x
Reference in New Issue
Block a user