From 293b96aa454f2d9c75216d761b3530ef8ab734b4 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 25 Feb 2022 11:01:55 -0500 Subject: [PATCH] ARTEMIS-3697 Adding test assertion on invalid IO on Paging This commit is part of a bigger task where I am improving paging. This test is needed to validate some of the changes I am making on further commits. --- .../io/AbstractSequentialFileFactory.java | 3 +- .../journal/ActiveMQJournalLogger.java | 6 ++ .../tests/integration/paging/PagingTest.java | 75 +++++++++++-------- 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 482aaa2a51..66d0e84701 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -184,10 +184,9 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac @Override public void onIOError(Exception exception, String message, SequentialFile file) { + ActiveMQJournalLogger.LOGGER.criticalIO(message, exception); if (critialErrorListener != null) { critialErrorListener.onIOException(exception, message, file); - } else { - logger.warn("Critical IO Error Called. No Critical IO Error Handler Registered::" + message + " at file " + file, exception); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java index 53ba9f1c3b..b049923bc3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java @@ -288,4 +288,10 @@ public interface ActiveMQJournalLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 144009, value = "Could not get a file in {0} seconds, System will retry the open but you may see increased latency in your system", format = Message.Format.MESSAGE_FORMAT) void cantOpenFileTimeout(long timeout); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 144010, value = "Critical IO Exception happened: {0}", format = Message.Format.MESSAGE_FORMAT) + void criticalIO(String message, @Cause Exception error); + + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index acee2dfb24..6e25a64a18 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -2041,58 +2041,67 @@ public class PagingTest extends ActiveMQTestBase { @Test public void testInabilityToCreateDirectoryDuringPaging() throws Exception { - // this test only applies to file-based stores - Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE); - clearDataRecreateServerDirs(); + AssertionLoggerHandler.startCapture(); - Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString()); + try { - server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + // this test only applies to file-based stores + Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE); - server.start(); + clearDataRecreateServerDirs(); - final int numberOfMessages = 100; + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString()); - locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); - sf = createSessionFactory(locator); + server.start(); - ClientSession session = sf.createSession(false, true, true); + final int numberOfMessages = 100; - session.createQueue(new QueueConfiguration(PagingTest.ADDRESS)); + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); - ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + sf = createSessionFactory(locator); - ClientMessage message = null; + ClientSession session = sf.createSession(false, true, true); - byte[] body = new byte[MESSAGE_SIZE]; + session.createQueue(new QueueConfiguration(PagingTest.ADDRESS)); - ByteBuffer bb = ByteBuffer.wrap(body); + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); - for (int j = 1; j <= MESSAGE_SIZE; j++) { - bb.put(getSamplebyte(j)); - } + ClientMessage message = null; - for (int i = 0; i < numberOfMessages; i++) { - message = session.createMessage(true); + byte[] body = new byte[MESSAGE_SIZE]; - ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + ByteBuffer bb = ByteBuffer.wrap(body); - bodyLocal.writeBytes(body); - - message.putIntProperty(new SimpleString("id"), i); - - try { - producer.send(message); - } catch (Exception e) { - // ignore + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); } + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + message.putIntProperty(new SimpleString("id"), i); + + try { + producer.send(message); + } catch (Exception e) { + // ignore + } + } + assertTrue(Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED, 5000, 200)); + session.close(); + sf.close(); + locator.close(); + } finally { + Assert.assertTrue(AssertionLoggerHandler.findText("AMQ144010")); + AssertionLoggerHandler.stopCapture(); } - assertTrue(Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED, 5000, 200)); - session.close(); - sf.close(); - locator.close(); } /**