ARTEMIS-3697 Adding test assertion on invalid IO on Paging
This commit is part of a bigger task where I am improving paging. This test is needed to validate some of the changes I am making on further commits.
This commit is contained in:
parent
b0f131f080
commit
293b96aa45
|
@ -184,10 +184,9 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
|||
|
||||
@Override
|
||||
public void onIOError(Exception exception, String message, SequentialFile file) {
|
||||
ActiveMQJournalLogger.LOGGER.criticalIO(message, exception);
|
||||
if (critialErrorListener != null) {
|
||||
critialErrorListener.onIOException(exception, message, file);
|
||||
} else {
|
||||
logger.warn("Critical IO Error Called. No Critical IO Error Handler Registered::" + message + " at file " + file, exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -288,4 +288,10 @@ public interface ActiveMQJournalLogger extends BasicLogger {
|
|||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 144009, value = "Could not get a file in {0} seconds, System will retry the open but you may see increased latency in your system", format = Message.Format.MESSAGE_FORMAT)
|
||||
void cantOpenFileTimeout(long timeout);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 144010, value = "Critical IO Exception happened: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void criticalIO(String message, @Cause Exception error);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -2041,58 +2041,67 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testInabilityToCreateDirectoryDuringPaging() throws Exception {
|
||||
// this test only applies to file-based stores
|
||||
Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
AssertionLoggerHandler.startCapture();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString());
|
||||
try {
|
||||
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
// this test only applies to file-based stores
|
||||
Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
|
||||
|
||||
server.start();
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
final int numberOfMessages = 100;
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString());
|
||||
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
|
||||
|
||||
sf = createSessionFactory(locator);
|
||||
server.start();
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
final int numberOfMessages = 100;
|
||||
|
||||
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
|
||||
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
|
||||
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
sf = createSessionFactory(locator);
|
||||
|
||||
ClientMessage message = null;
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
byte[] body = new byte[MESSAGE_SIZE];
|
||||
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
|
||||
|
||||
for (int j = 1; j <= MESSAGE_SIZE; j++) {
|
||||
bb.put(getSamplebyte(j));
|
||||
}
|
||||
ClientMessage message = null;
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
message = session.createMessage(true);
|
||||
byte[] body = new byte[MESSAGE_SIZE];
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
ByteBuffer bb = ByteBuffer.wrap(body);
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
message.putIntProperty(new SimpleString("id"), i);
|
||||
|
||||
try {
|
||||
producer.send(message);
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
for (int j = 1; j <= MESSAGE_SIZE; j++) {
|
||||
bb.put(getSamplebyte(j));
|
||||
}
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
message = session.createMessage(true);
|
||||
|
||||
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
|
||||
|
||||
bodyLocal.writeBytes(body);
|
||||
|
||||
message.putIntProperty(new SimpleString("id"), i);
|
||||
|
||||
try {
|
||||
producer.send(message);
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
assertTrue(Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED, 5000, 200));
|
||||
session.close();
|
||||
sf.close();
|
||||
locator.close();
|
||||
} finally {
|
||||
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ144010"));
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
}
|
||||
assertTrue(Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED, 5000, 200));
|
||||
session.close();
|
||||
sf.close();
|
||||
locator.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue