NO-JIRA Adding a test to validating paging reload
no issues found as part of this test. Just adding a new test.
This commit is contained in:
parent
52c45f5aec
commit
3f63e6be82
|
@ -25,6 +25,7 @@ import javax.jms.Queue;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
|
@ -61,25 +62,36 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testCleanup() throws Throwable {
|
||||
testCleanup(false);
|
||||
if (DIRECT_CALL) {
|
||||
internalTestRegular();
|
||||
} else {
|
||||
remoteCall("internalTestRegular");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanupMidstream() throws Throwable {
|
||||
testCleanup(true);
|
||||
if (DIRECT_CALL) {
|
||||
internalTestMidstream();
|
||||
} else {
|
||||
remoteCall("internalTestMidstream");
|
||||
}
|
||||
}
|
||||
|
||||
public void testCleanup(boolean midstream) throws Throwable {
|
||||
|
||||
if (DIRECT_CALL) {
|
||||
internalTest(midstream);
|
||||
} else {
|
||||
// Using a spawn to limit memory consumption to the test
|
||||
Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), "" + midstream);
|
||||
logger.debug("process PID {}", process.pid());
|
||||
Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
|
||||
Assert.assertEquals(OK, process.exitValue());
|
||||
}
|
||||
@Test
|
||||
public void testRestart() throws Throwable {
|
||||
remoteCall("populate");
|
||||
System.out.println("Resuming...");
|
||||
remoteCall("resume");
|
||||
}
|
||||
|
||||
private void remoteCall(String methodName) throws Exception {
|
||||
// Using a spawn to limit memory consumption to the test
|
||||
Process process = SpawnedVMSupport.spawnVM(MegaCleanerPagingTest.class.getName(), new String[]{"-Xmx512M"}, getTestDir(), methodName);
|
||||
logger.debug("process PID {}", process.pid());
|
||||
Assert.assertTrue(process.waitFor(10, TimeUnit.MINUTES));
|
||||
Assert.assertEquals(OK, process.exitValue());
|
||||
}
|
||||
|
||||
// I am using a separate VM to limit memory..
|
||||
|
@ -88,8 +100,11 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
|
|||
try {
|
||||
MegaCleanerPagingTest megaCleanerPagingTest = new MegaCleanerPagingTest();
|
||||
megaCleanerPagingTest.setTestDir(arg[0]);
|
||||
boolean midstream = Boolean.parseBoolean(arg[1]);
|
||||
megaCleanerPagingTest.internalTest(midstream);
|
||||
String methodName = arg[1];
|
||||
|
||||
|
||||
Method method = megaCleanerPagingTest.getClass().getMethod(methodName);
|
||||
method.invoke(megaCleanerPagingTest);
|
||||
System.exit(OK);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
|
@ -98,6 +113,142 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
public void internalTestMidstream() throws Throwable {
|
||||
internalTest(true);
|
||||
}
|
||||
|
||||
public void internalTestRegular() throws Throwable {
|
||||
internalTest(false);
|
||||
}
|
||||
|
||||
public void populate() throws Throwable {
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
server.getConfiguration().clearAddressSettings();
|
||||
server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
|
||||
server.start();
|
||||
|
||||
// if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
|
||||
// if more memory is set to the JUNIT Runner.
|
||||
// This is the main reason we limit memory on this test
|
||||
int NUMBER_OF_MESSAGES = 100_000;
|
||||
|
||||
String queueName = "testPageAndDepage";
|
||||
|
||||
server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
|
||||
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
|
||||
Assert.assertNotNull(serverQueue);
|
||||
serverQueue.getPagingStore().startPaging();
|
||||
|
||||
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
|
||||
Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
|
||||
|
||||
final int SIZE = 10 * 1024;
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
producer.send(session.createTextMessage(createBuffer(i, SIZE)));
|
||||
if (i % 1000 == 0) {
|
||||
logger.debug("sent {} messages", i);
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
|
||||
|
||||
PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
|
||||
store.disableCleanup();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES / 2; i++) {
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(createBuffer(i, SIZE), message.getText());
|
||||
|
||||
if (i % 1000 == 0) {
|
||||
logger.debug("received {} messages", i);
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
connection.close();
|
||||
|
||||
AssertionLoggerHandler.startCapture();
|
||||
runAfter(AssertionLoggerHandler::stopCapture);
|
||||
|
||||
server.stop();
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
|
||||
}
|
||||
|
||||
|
||||
public void resume() throws Throwable {
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
server.getConfiguration().clearAddressSettings();
|
||||
server.getConfiguration().addAddressSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeMessages(1000).setPageSizeBytes(10 * 1024 * 1024));
|
||||
server.start();
|
||||
|
||||
// if I didn't limit the memory on this test, the NUMBER_OF_MESSAGES would have to be something huge such as 500_000, and that could be a moving target
|
||||
// if more memory is set to the JUNIT Runner.
|
||||
// This is the main reason we limit memory on this test
|
||||
int NUMBER_OF_MESSAGES = 100_000;
|
||||
|
||||
String queueName = "testPageAndDepage";
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
|
||||
Assert.assertNotNull(serverQueue);
|
||||
serverQueue.getPagingStore().startPaging();
|
||||
|
||||
ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616?consumerWindowSize=0");
|
||||
Assert.assertEquals(0, ((ActiveMQConnectionFactory)cf).getServerLocator().getConsumerWindowSize());
|
||||
|
||||
final int SIZE = 10 * 1024;
|
||||
session.commit();
|
||||
|
||||
|
||||
PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
|
||||
store.disableCleanup();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int i = NUMBER_OF_MESSAGES / 2; i < NUMBER_OF_MESSAGES; i++) {
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(createBuffer(i, SIZE), message.getText());
|
||||
|
||||
if (i % 1000 == 0) {
|
||||
logger.debug("received {} messages", i);
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
connection.close();
|
||||
|
||||
AssertionLoggerHandler.startCapture();
|
||||
runAfter(AssertionLoggerHandler::stopCapture);
|
||||
store.getCursorProvider().resumeCleanup();
|
||||
|
||||
server.stop();
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
|
||||
}
|
||||
|
||||
public void internalTest(boolean midstream) throws Throwable {
|
||||
ActiveMQServer server = createServer(true, true);
|
||||
server.getConfiguration().clearAddressSettings();
|
||||
|
|
Loading…
Reference in New Issue