This commit is contained in:
Justin Bertram 2020-02-14 13:57:04 -06:00
commit b588f61670
3 changed files with 104 additions and 3 deletions

View File

@ -728,7 +728,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
Page page = pages.remove(packet.getPageNumber());
if (page == null) {
page = getPage(packet.getStoreName(), packet.getPageNumber());
// if page is null, we create it the instance and include it on the map
// then we must recurse this call
// so page.delete or page.close will not leave any closed objects on the hashmap
getPage(packet.getStoreName(), packet.getPageNumber());
handlePageEvent(packet);
return;
}
if (page != null) {

View File

@ -447,7 +447,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229216, value = "Invalid queue name: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException invalidQueueName(SimpleString queueName);
@Message(id = 119217, value = "Can't write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT)
@Message(id = 119217, value = "Cannot write to closed file: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIOErrorException cannotWriteToClosedFile(SequentialFile file);
@Message(id = 229218, value = "Failed to locate broker configuration URL")

View File

@ -18,10 +18,19 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
@ -37,6 +46,93 @@ public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
@Override
@Test
public void testFailWithBrowser() throws Exception {
// paged messages are not available for browsing
internalBrowser(0);
}
@Test
public void testFailWithBrowserWithClose() throws Exception {
internalBrowser(1);
}
@Test
public void testFailWithBrowserWithDelete() throws Exception {
internalBrowser(2);
}
//
// 0 - no tamper
// 1 - close files
// 2 - remove files
private void internalBrowser(int temperMode) throws Exception {
int numMessages = 50;
int messagesPerPage = 10;
int iterations = 10;
createSessionFactory();
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, false);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
Queue queue = liveServer.getServer().locateQueue(FailoverTest.ADDRESS);
for (int j = 0; j < iterations; j++) {
System.err.println("#iteration " + j);
queue.getPageSubscription().getPagingStore().startPaging();
Assert.assertNotNull(queue);
for (int i = 0; i < numMessages; i++) {
// some are durable, some are not!
producer.send(createMessage(session, i, i % 2 == 0));
if (i > 0 && i % messagesPerPage == 0) {
queue.getPageSubscription().getPagingStore().forceAnotherPage();
}
}
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
session.start();
while (true) {
ClientMessage msg = consumer.receive(500);
if (msg == null) {
break;
}
}
consumer.close();
PagingStore store = queue.getPageSubscription().getPagingStore();
if (temperMode == 1) {
// this is tampering with the system causing an artifical issue. The system should still heal itself.
for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) {
System.out.println("Sending close on " + pageID);
liveServer.getServer().getStorageManager().pageClosed(store.getStoreName(), (int) pageID);
}
} else if (temperMode == 2) {
// this is tampering with the system causing an artifical issue. The system should still heal itself.
for (long pageID = store.getFirstPage(); pageID <= store.getCurrentPage().getPageId() + 10; pageID++) {
System.out.println("Sending close on " + pageID);
liveServer.getServer().getStorageManager().pageDeleted(store.getStoreName(), (int) pageID);
}
}
store.getFirstPage();
store.getCurrentPage().getPageId();
consumer = session.createConsumer(FailoverTestBase.ADDRESS, false);
session.start();
while (true) {
ClientMessage msg = consumer.receive(500);
if (msg == null) {
break;
}
msg.acknowledge();
}
consumer.close();
Wait.assertFalse(queue.getPageSubscription().getPagingStore()::isPaging);
}
}
}